这篇文章主要是优化了hive的连接,不再是单节点连接hiveserver2.?
# -*- coding: utf-8 -*-
# created by say 2021-06-09
from pyspark.sql import SparkSession
import datetime
import logging
import pymysql
import sys, os
sys.path.append(os.getcwd())
isPrd = True
MP_DB_CONNECT_INFO = \
{'host': '123456.mysql.aliyun.com', 'port': 3306, 'user': 'dev',
'password': 'dev', 'database': 'db_dev',
'charset': 'utf8'} if (isPrd) else \
{'host': '192.168.1.110', 'port': 3306, 'user': 'dev', 'password': 'dev', 'database': 'dev',
'charset': 'utf8'}
# hive链接信息
HIVE_CONNECT_INFO = {'host': 'thrift://192.168.2.112:9083,thrift://192.168.2.114:9083'} \
if (isPrd) else {'host': 'thrift://192.168.1.142:9083,thrift://192.168.1.112:9083'}
# 初始化
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 生成日志句柄
logger = logging.getLogger("user_active_count")
class dataMp(object):
def __init__(self):
# spark ss
self.sc = SparkSession.builder \
.config("hive.metastore.uris", HIVE_CONNECT_INFO.get("host")) \
.appName("spark_stat_di") \
.enableHiveSupport() \
.getOrCreate()
# 连接mysql
self._conn = pymysql.connect(host=MP_DB_CONNECT_INFO.get('host'),
port=MP_DB_CONNECT_INFO.get('port'),
user=MP_DB_CONNECT_INFO.get('user'),
passwd=MP_DB_CONNECT_INFO.get('password'),
db=MP_DB_CONNECT_INFO.get('database'),
charset=MP_DB_CONNECT_INFO.get('charset')
)
self._cursor = self._conn.cursor()
def query(self, sql):
logger.info(sql)
try:
self._cursor.execute(sql)
self._conn.commit()
result = self._cursor.fetchall()
except pymysql.Error as e:
logger.info(e)
result = None
print(result)
return result
# 每天运行查询的数据
def getLabelCount(self, month):
sql = """
select concat(substr(month,1,4),'-',substr(month,5,6)) as stat_month,count(user_id) as user_count
from dws.dws_user_active_smd where month ={month} group by concat(substr(month,1,4),'-',substr(month,5,6))
""".format(month=month)
logger.info(sql)
result = self.sc.sql(sql).collect()
size = len(result)
logger.info("数据的数量:" + str(size))
if size == 0:
return 0
return result
def realTimeLabelDay(self, month):
resultCount = self.getLabelCount(month)
logger.info("统计结果:" + str(resultCount))
if resultCount == 0:
logger.info("查询无数据~")
return 0
self.forRun(resultCount)
# 每天运行查询的数据
def initCount(self, month):
sql = """
select concat(substr(month,1,4),'-',substr(month,5,6)) as stat_month,count(xdm_user_id) as user_count
from dws.dws_user_active_smd where month <={month} group by concat(substr(month,1,4),'-',substr(month,5,6))
""".format(month=month)
logger.info(sql)
result = self.sc.sql(sql).collect()
size = len(result)
logger.info("数据的数量:" + str(size))
if size == 0:
return 0
return result
def forRun(self, resultCount):
for i in range(0, len(resultCount)):
logger.info(resultCount[i])
user_month = str(resultCount[i][0])
user_count = resultCount[i][1]
query_sql = """
select id from active_count_cm
where stat_month = '{stat_month}' limit 1
""".format(stat_month=user_month)
insert_sql = """
insert into active_count_cm (active_count, stat_month )
values ('{user_count}', '{user_month}')
""".format(user_count=user_count, user_month=user_month)
query_result = self.query(query_sql)
if query_result is None or query_result == ():
logger.info("inset语句: " + insert_sql)
self._cursor.execute(insert_sql)
self._conn.commit()
logger.info("inset影响行: " + str(self._cursor.lastrowid))
else:
id = query_result[0][0]
update_sql = """
update active_count_cm set active_count={user_count}, stat_month='{user_month}'
where id = {id}
""".format(user_count=user_count, user_month=user_month, id=id)
logger.info("update语句: " + update_sql)
self._cursor.execute(update_sql)
self._conn.commit()
logger.info("update影响行: " + str(self._cursor.rowcount)) # 影响行数
self._cursor.close()
def initRun(self, month):
resultCount = self.initCount(month)
logger.info("统计结果:" + str(resultCount))
if resultCount == 0:
logger.info("查询无数据~")
return 0
self.forRun(resultCount)
if __name__ == '__main__':
db = dataMp()
if len(sys.argv) > 1:
dt = sys.argv[1] # 每天运行
stat_month = dt[0:7]
month = stat_month.replace('-', '')
logger.info("运行统计月:" + stat_month + ", 运行月:" + month)
db.realTimeLabelDay(month)
else:
stat_month = (datetime.datetime.now() + datetime.timedelta(hours=0)).strftime("%Y-%m")
month = (datetime.datetime.now() + datetime.timedelta(hours=0)).strftime("%Y%m")
logger.info("现在运行初始化, 运行日期:" + month + "!!!!")
db.initRun(month)
|