工作中使用hive进行数据预处理,有时候需要对数据抽样来做一些评估分析。下面记录通过python进行分层随机抽样的过程。
使用python连接数据库读取数据请参考:使用python连接数据库
import sqlalchemy
import pandas as pd
import numpy as np
# 1. 连接hive获取数据
def get_hive_data(sql):
user_name = "hive引擎地址"
hive_engine = sqlalchemy.create_engine(user_name)
data = pd.read_sql(sql, hive_engine)
print ("\n数据读取完毕\n")
return data
# 2. 确定抽样字段(入参项及其他变量字段), 读取数据
sql = """ select sgroup, var1, var2, var3 from table1 """
df = get_hive_data(sql)
# 3. 确认分层抽样的样本量。假设按group(枚举值grp1,grp2,grp3)分组抽样,每组抽样量如下:
sgroup 抽样量
grp1 300
grp2 200
grp3 100
# 4. 抽样代码
typicalNdict = { "grp1":300
,"grp2":200
,"grp3":100
}
df_all = df.groupby("sgroup", as_index=False)['var1'].count()
df_all = df_all.rename(columns={'var1':"grp_cnt"}
typicalNdict_df = pd.DataFrame({'sgroup' :list(typicalNdict.keys()),
's_grp_cnt':list(typicalNdict.values())
})
sample_df = df.merge(df_all, how='inner', on=['sgroup']).merge(typicalNdict_df , how='inner', on=['sgroup'])
def typicalSampling(group, typicalNdict):
name = group.name
n = typicalNdict[name]
return group.sample(n=n)
np.random.seed(seed=3032) # 设置随机数
rslt= df_sample.groupby('sgroup', group_keys=False).apply(typicalSampling, typicalNdict)
rslt['weight']=rslt['grp_cnt']/rslt['s_grp_cnt']
rslt.insert(0, 'id', np.arange(1, len(rslt)+1)
rslt.to_excel('sample_rslt.xlsx', index=False)
|