IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Python应用随笔3——pyspark读写数据库 -> 正文阅读

[大数据]Python应用随笔3——pyspark读写数据库

零、前言

本文围绕Zeppelin中使用pyspark连接MySQL、PG(PostgreSQL)/GP(Greenplum)展开,简单教程,欢迎大佬评论补充。
PS: Zeppelin中需指定%pyspark对应解释器后再进行Python代码编写,见每个代码块首行

一、pyspark读取MySQL表

0. 此处使用zeppelin自带的连接方式,需在zeppelin服务器中安装好对应驱动,读取数据后转换为DataFrame格式展示;

1. MySQL连接信息配置(主机、端口、库名、用户名、密码)

%pyspark
# MySQL连接信息
my_host = 'XXXXXXX' # 主机
my_post = 'XXXX' # 端口
my_database = 'XXXXXX' # 数据库名
my_user = 'XXXXX' # 用户名
my_password = 'XXXXXXXXXX' # 密码
my_url = "jdbc:mysql://{host}:{post}/{database}?user={user}&password={password}".format(host=my_host, post=my_post, database=my_database, user=my_user, password=my_password)

2. 读取全表

%pyspark
# 城市信息表
# 读取全表
## .option(url, dbtable)参数说明:
## url为上述配置好的MySQL链接(此处为url=my_url)
## dbtable可以是表名,亦可以是sql查询语句(此处为表名,即读取全表)
dim_city = spark.read.format("jdbc").options(url=my_url, dbtable="dim_city").load().toPandas()

print(dim_city.info())
dim_city

输出:
在这里插入图片描述
3. 读取表的部分数据

  • 方法1
%pyspark
# 简单sql语句读取表的部分信息
# 注意:dbtable参数输入的sql语句编写完后需要加括号并赋值表名,格式如“(···sql···) tbx”
dim_city_zj = spark.read.format("jdbc").options(url=my_url, dbtable="(select * from dim_city where province='浙江') tb").load().toPandas()

print(dim_city_zj.info())
dim_city_zj

输出:
在这里插入图片描述

  • 方法2
%pyspark
# 复杂一些的sql可使用多行文本形式进行编写,可进行表连接
sql_city = '''
    (select tb1.city,
        tb1.district,
        tb2.city_level
    from dim_city tb1
    left join dim_city_info tb2
        on tb1.city = tb2.city
    where tb1.city = '杭州'
    ) tb
    '''
dim_city_hz = spark.read.format("jdbc").options(url=my_url, dbtable=sql_city).load().toPandas()

print(dim_city_hz.info())
dim_city_hz

输出:
在这里插入图片描述

二、pyspark读取PostgreSQL/Greenplum表

0. 此处涉及psycopg2、pandas两个包,直接使用pd.read_sql()方法读表,读取数据后为DataFrame格式;

1. 工具包、PG/GP连接信息配置

%pyspark
import psycopg2
import pandas as pd

# PG/GP连接信息
host = "XXXXXXX" # 主机
port = "XXXX" # 端口
user = "XXXXX" # 用户名
password = "XXXXXXXXXX" # 密码
database = "XXXXXX" # 数据库名
conn = psycopg2.connect(database=database, host=host, port=port, user=user, password=password)

2. sql取数

%pyspark
# 此处正常写入sql语句即可,无需像上述连接MySQL方法中额外加括号起表名
sql_city = '''
    select tb1.city,
        tb1.district,
        tb2.city_level
    from public.dim_city tb1
    left join public.dim_city_info tb2
        on tb1.city = tb2.city
    where tb1.city = '杭州'
    '''
dim_city_hz = pd.read_sql(sql_city, conn)

print(dim_city_hz.info())
dim_city_hz

输出:
在这里插入图片描述

三、拓展:pyspark数据写入(导出)到MySQL

此处使用上述结果表dim_city_hz

%pyspark
from pyspark.sql.types import *

# MySQL连接信息(同上文)
my_host = 'XXXXXXX' # 主机
my_post = 'XXXX' # 端口
my_database = 'XXXXXX' # 数据库名
my_user = 'XXXXX' # 用户名
my_password = 'XXXXXXXXXX' # 密码
my_url = "jdbc:mysql://{host}:{post}/{database}?user={user}&password={password}".format(host=my_host, post=my_post, database=my_database, user=my_user, password=my_password)

# 参数:用户名、密码
auth_mysql = {'user': my_user, 'password': my_password}
# 输出后的表名
table_name = 'dim_city_hz'
# 表结构定义
schema = StructType([StructField("city", StringType(), True),
                    StructField("district", StringType(), True),
                    StructField("city_level", StringType(), True)
                    ])
''' PS:
- 每个StructField()中传入3个参数值,分别为:表名、数据类型、是否可为空;
- 其他常用数据类型有:
data_type = {'整型': 'IntegerType()',
			'浮点型': 'DoubleType()',
			'日期': 'DateType()'}
'''
# 将pandas中的DF转换为spark中的DF
tmp_dim_city_hz = spark.createDataFrame(dim_city_hz, schema)
# 写入MySQL
print('数据正在写入MySQL……')
tmp_dim_city_hz.write.jdbc(my_url, table_name, mode='overwrite', properties=auth_mysql) # mode='append'可追加写入
print('写入MySQL完毕!')

输出:
在这里插入图片描述
MySQL中输出后的表:
在这里插入图片描述

四、结语

pyspark连接数据库的方法众多,本文仅以个人常用方法为示例,希望能给大家提供参考和帮助~

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:20:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:02:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码