IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> pyspark.SparkSession查询hive数据写入mysql -> 正文阅读

[大数据]pyspark.SparkSession查询hive数据写入mysql

这篇文章主要是优化了hive的连接,不再是单节点连接hiveserver2.?

# -*- coding: utf-8 -*-

# created by say  2021-06-09
from pyspark.sql import SparkSession
import datetime
import logging
import pymysql
import sys, os

sys.path.append(os.getcwd())

isPrd = True
MP_DB_CONNECT_INFO = \
    {'host': '123456.mysql.aliyun.com', 'port': 3306, 'user': 'dev',
     'password': 'dev', 'database': 'db_dev',
     'charset': 'utf8'} if (isPrd) else \
        {'host': '192.168.1.110', 'port': 3306, 'user': 'dev', 'password': 'dev', 'database': 'dev',
         'charset': 'utf8'}
# hive链接信息
HIVE_CONNECT_INFO = {'host': 'thrift://192.168.2.112:9083,thrift://192.168.2.114:9083'} \
        if (isPrd) else {'host': 'thrift://192.168.1.142:9083,thrift://192.168.1.112:9083'}
# 初始化
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 生成日志句柄
logger = logging.getLogger("user_active_count")


class dataMp(object):

    def __init__(self):
        # spark ss
        self.sc = SparkSession.builder \
            .config("hive.metastore.uris", HIVE_CONNECT_INFO.get("host")) \
            .appName("spark_stat_di") \
            .enableHiveSupport() \
            .getOrCreate()
        # 连接mysql
        self._conn = pymysql.connect(host=MP_DB_CONNECT_INFO.get('host'),
                                     port=MP_DB_CONNECT_INFO.get('port'),
                                     user=MP_DB_CONNECT_INFO.get('user'),
                                     passwd=MP_DB_CONNECT_INFO.get('password'),
                                     db=MP_DB_CONNECT_INFO.get('database'),
                                     charset=MP_DB_CONNECT_INFO.get('charset')
                                     )
        self._cursor = self._conn.cursor()

    def query(self, sql):
        logger.info(sql)
        try:
            self._cursor.execute(sql)
            self._conn.commit()
            result = self._cursor.fetchall()
        except pymysql.Error as e:
            logger.info(e)
            result = None
        print(result)
        return result

    # 每天运行查询的数据
    def getLabelCount(self, month):
        sql = """
                select concat(substr(month,1,4),'-',substr(month,5,6)) as stat_month,count(user_id) as user_count 
            from dws.dws_user_active_smd where month ={month} group by concat(substr(month,1,4),'-',substr(month,5,6))
            """.format(month=month)
        logger.info(sql)
        result = self.sc.sql(sql).collect()
        size = len(result)
        logger.info("数据的数量:" + str(size))
        if size == 0:
            return 0
        return result

    def realTimeLabelDay(self, month):
        resultCount = self.getLabelCount(month)
        logger.info("统计结果:" + str(resultCount))
        if resultCount == 0:
            logger.info("查询无数据~")
            return 0
        self.forRun(resultCount)

    # 每天运行查询的数据
    def initCount(self, month):
        sql = """
                select concat(substr(month,1,4),'-',substr(month,5,6)) as stat_month,count(xdm_user_id) as user_count 
            from dws.dws_user_active_smd where month <={month} group by concat(substr(month,1,4),'-',substr(month,5,6))
            """.format(month=month)
        logger.info(sql)
        result = self.sc.sql(sql).collect()
        size = len(result)
        logger.info("数据的数量:" + str(size))
        if size == 0:
            return 0
        return result

    def forRun(self, resultCount):
        for i in range(0, len(resultCount)):
            logger.info(resultCount[i])
            user_month = str(resultCount[i][0])
            user_count = resultCount[i][1]
            query_sql = """
                    select id from active_count_cm 
                where stat_month = '{stat_month}' limit 1
            """.format(stat_month=user_month)
            insert_sql = """
                    insert into active_count_cm (active_count, stat_month ) 
                values ('{user_count}', '{user_month}')
            """.format(user_count=user_count, user_month=user_month)
            query_result = self.query(query_sql)

            if query_result is None or query_result == ():
                logger.info("inset语句: " + insert_sql)
                self._cursor.execute(insert_sql)
                self._conn.commit()
                logger.info("inset影响行: " + str(self._cursor.lastrowid))
            else:
                id = query_result[0][0]
                update_sql = """
                    update active_count_cm set active_count={user_count}, stat_month='{user_month}' 
                        where id = {id}
                """.format(user_count=user_count, user_month=user_month, id=id)
                logger.info("update语句: " + update_sql)
                self._cursor.execute(update_sql)
                self._conn.commit()
                logger.info("update影响行: " + str(self._cursor.rowcount))  # 影响行数
        self._cursor.close()

    def initRun(self, month):
        resultCount = self.initCount(month)
        logger.info("统计结果:" + str(resultCount))
        if resultCount == 0:
            logger.info("查询无数据~")
            return 0
        self.forRun(resultCount)


if __name__ == '__main__':
    db = dataMp()
    if len(sys.argv) > 1:
        dt = sys.argv[1]  # 每天运行
        stat_month = dt[0:7]
        month = stat_month.replace('-', '')
        logger.info("运行统计月:" + stat_month + ", 运行月:" + month)
        db.realTimeLabelDay(month)
    else:
        stat_month = (datetime.datetime.now() + datetime.timedelta(hours=0)).strftime("%Y-%m")
        month = (datetime.datetime.now() + datetime.timedelta(hours=0)).strftime("%Y%m")
        logger.info("现在运行初始化, 运行日期:" + month + "!!!!")
        db.initRun(month)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-22 13:36:15  更:2021-08-22 13:36:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 12:50:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码