IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> PysparkNote005---批量写入Redis -> 正文阅读

[大数据]PysparkNote005---批量写入Redis

Intro

??批量写redis,主要考虑两个点:

  • 数据分批-spark的foreachPartition
  • 批数据的分批插入redis,redis pipeline提交

Code

直接看代码

import redis
import json
import random
import pandas as pd
import functools
key_list = [f"test_{str(i).zfill(3)}" for i in range(100)]
value_list = [json.dumps({"name":"jack","sex":"male"})] * 100
df = pd.DataFrame({"key":key_list,"value":value_list})
df.head()
keyvalue
0test_000{"name": "jack", "sex": "male"}
1test_001{"name": "jack", "sex": "male"}
2test_002{"name": "jack", "sex": "male"}
3test_003{"name": "jack", "sex": "male"}
4test_004{"name": "jack", "sex": "male"}
from pyspark.sql import SparkSession
spark_df = spark.createDataFrame(df)
spark_df.show()
+--------+--------------------+
|     key|               value|
+--------+--------------------+
|test_000|{"name": "jack", ...|
|test_001|{"name": "jack", ...|
|test_002|{"name": "jack", ...|
|test_003|{"name": "jack", ...|
|test_004|{"name": "jack", ...|
|test_005|{"name": "jack", ...|
|test_006|{"name": "jack", ...|
|test_007|{"name": "jack", ...|
|test_008|{"name": "jack", ...|
|test_009|{"name": "jack", ...|
|test_010|{"name": "jack", ...|
|test_011|{"name": "jack", ...|
|test_012|{"name": "jack", ...|
|test_013|{"name": "jack", ...|
|test_014|{"name": "jack", ...|
|test_015|{"name": "jack", ...|
|test_016|{"name": "jack", ...|
|test_017|{"name": "jack", ...|
|test_018|{"name": "jack", ...|
|test_019|{"name": "jack", ...|
+--------+--------------------+
only showing top 20 rows
def insert2redis(part,  batch=50, expire_time=60):
    """
    @param part: rdd part;两列值key、value
    @param batch: 批量写入的数量
    @param expire_time: 过期时间
    @return:
    """
    db_param = {"host": '127.0.0.1', "port": 6379, "password": '12345', "db": 0}

    db = redis.Redis(host=db_param["host"],
                     port=db_param["port"],
                     password=db_param["password"],
                     db=db_param["db"],
                     encoding='utf-8',
                     decode_responses=True)
    pipe = db.pipeline()
    cnt = 0
    for row in part:
        pipe.hset(name=row["key"], mapping=json.loads(row["value"])) # 以字典形式写入
        pipe.expire(name=row["key"], time=expire_time + random.randint(0, 5))  # 过期时间随机化,防止批量过期
        cnt = cnt + 1
        if cnt > 0 and cnt % batch == 0:
            pipe.execute()
            print(f"第{cnt - batch}-{cnt}行数据插入redis!")
    # 最后一波数据如果不是batch余数,也推过去
    if cnt % batch != 0:
        pipe.execute()
        print(f"第{cnt - cnt % batch}-{cnt}行数据插入redis!")
    pipe.close()
    db.close()

spark_df.repartition(3).rdd.foreachPartition(
        functools.partial(insert2redis, batch=100, expire_time=60))

在这里插入图片描述

Ref

[1] https://testerhome.com/topics/25448
[2] https://blog.csdn.net/sinat_15793123/article/details/80594748
????????????????????????????????2022-04-24 于南京市江宁区九龙湖

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-26 11:47:11  更:2022-04-26 11:50:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 10:51:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码