Intro
??批量写redis,主要考虑两个点:
- 数据分批-spark的foreachPartition
- 批数据的分批插入redis,redis pipeline提交
Code
直接看代码
import redis
import json
import random
import pandas as pd
import functools
key_list = [f"test_{str(i).zfill(3)}" for i in range(100)]
value_list = [json.dumps({"name":"jack","sex":"male"})] * 100
df = pd.DataFrame({"key":key_list,"value":value_list})
df.head()
| key | value |
---|
0 | test_000 | {"name": "jack", "sex": "male"} |
---|
1 | test_001 | {"name": "jack", "sex": "male"} |
---|
2 | test_002 | {"name": "jack", "sex": "male"} |
---|
3 | test_003 | {"name": "jack", "sex": "male"} |
---|
4 | test_004 | {"name": "jack", "sex": "male"} |
---|
from pyspark.sql import SparkSession
spark_df = spark.createDataFrame(df)
spark_df.show()
+--------+--------------------+
| key| value|
+--------+--------------------+
|test_000|{"name": "jack", ...|
|test_001|{"name": "jack", ...|
|test_002|{"name": "jack", ...|
|test_003|{"name": "jack", ...|
|test_004|{"name": "jack", ...|
|test_005|{"name": "jack", ...|
|test_006|{"name": "jack", ...|
|test_007|{"name": "jack", ...|
|test_008|{"name": "jack", ...|
|test_009|{"name": "jack", ...|
|test_010|{"name": "jack", ...|
|test_011|{"name": "jack", ...|
|test_012|{"name": "jack", ...|
|test_013|{"name": "jack", ...|
|test_014|{"name": "jack", ...|
|test_015|{"name": "jack", ...|
|test_016|{"name": "jack", ...|
|test_017|{"name": "jack", ...|
|test_018|{"name": "jack", ...|
|test_019|{"name": "jack", ...|
+--------+--------------------+
only showing top 20 rows
def insert2redis(part, batch=50, expire_time=60):
"""
@param part: rdd part;两列值key、value
@param batch: 批量写入的数量
@param expire_time: 过期时间
@return:
"""
db_param = {"host": '127.0.0.1', "port": 6379, "password": '12345', "db": 0}
db = redis.Redis(host=db_param["host"],
port=db_param["port"],
password=db_param["password"],
db=db_param["db"],
encoding='utf-8',
decode_responses=True)
pipe = db.pipeline()
cnt = 0
for row in part:
pipe.hset(name=row["key"], mapping=json.loads(row["value"]))
pipe.expire(name=row["key"], time=expire_time + random.randint(0, 5))
cnt = cnt + 1
if cnt > 0 and cnt % batch == 0:
pipe.execute()
print(f"第{cnt - batch}-{cnt}行数据插入redis!")
if cnt % batch != 0:
pipe.execute()
print(f"第{cnt - cnt % batch}-{cnt}行数据插入redis!")
pipe.close()
db.close()
spark_df.repartition(3).rdd.foreachPartition(
functools.partial(insert2redis, batch=100, expire_time=60))
Ref
[1] https://testerhome.com/topics/25448 [2] https://blog.csdn.net/sinat_15793123/article/details/80594748 ????????????????????????????????2022-04-24 于南京市江宁区九龙湖
|