使用:
python rcdasebase_moe.py data_collection_base_00_db t_base_info_0
python rcdasebase_moe.py data_collection_base_00_db t_base_info_1
... ...
python rcdasebase_moe.py data_collection_base_99_db t_base_info_8
python rcdasebase_moe.py data_collection_base_99_db t_base_info_9
代码:
import MySQLdb,MySQLdb.cursors
import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import sys
start_day = '2021-07-01'
stop_day = '2021-10-01'
db_name = sys.argv[1]
table_name = sys.argv[2]
index_name = 'af_collect_data_2021q3_new'
if not db_name.startswith('data_collection_base'):
sys.exit(1)
if not table_name.startswith('t_base_info'):
sys.exit(1)
def mysql2es():
"""
读出 MySQL 数据,直接写入 ES
"""
conn = MySQLdb.connect(host='MySQL_IP', port=MySQL_PORT, user='MySQL_USER', passwd='MySQL_PWD', charset='utf8')
cur = conn.cursor()
es_client = Elasticsearch(["ES_IP_1", "ES_IP_2"], port=ES_PORT, timeout=180)
count = 0
while True:
f_day = (datetime.datetime.strptime(start_day, '%Y-%m-%d') + datetime.timedelta(days=count)).strftime("%Y-%m-%d")
e_day = (datetime.datetime.strptime(start_day, '%Y-%m-%d') + datetime.timedelta(days=count+1)).strftime("%Y-%m-%d")
print db_name, table_name, f_day
sql = """
select
concat(fuid, '_', Fscene_type, '_', Fchannel_id , '_', unix_timestamp(Fcreate_time)*1000 ) as id,
Fuid as uid,
Forder_id as orderId,
Fscene_type as sceneType,
Fchannel_id as channelId,
Fchannel_name as channelName,
Fapp_system as appSystem,
Fdevice_name as deviceName,
Fequipment_model as equipmentModel,
Fidfa as idfa,
Fimei as imei,
Fis_agent as isAgent,
Flatitude as latitude,
Flongitude as longitude,
Fgeohash as geohash,
Flocal_qq as localQq,
Flocal_tel as localTel,
Fmac_code as macCode,
Fnetwork_type as networkType,
Fphone_mac_address as phoneMacAddress,
Fsource as source,
Fwater_num as waterNum,
Fwifi_mac_address as wifiMacAddress,
Fwifi_name as wifiName,
Fip_address as ipAddress,
Fcreate_time as createTime,
Fcreate_time as actionTime
from %s.%s
where Fmodify_time >='%s 00:00:00' and Fmodify_time < '%s 00:00:00';""" % (db_name, table_name, f_day, e_day)
cur.execute(sql)
result = cur.fetchall()
actions = []
for i in result:
try:
c_time = i[25].strftime('%Y-%m-%d %H:%M:%S')
a_time = i[26].strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
print e
print i
continue
action = {
"_index" : index_name,
"_type" : "collect_data_base_info",
"_id" : i[0],
"_source" : {
"uid" : i[1],
"orderId" : i[2],
"sceneType" : i[3],
"channelId" : i[4],
"channelName" : i[5],
"appSystem" : i[6],
"deviceName" : i[7],
"equipmentModel" : i[8],
"idfa" : i[9],
"imei" : i[10],
"isAgent" : i[11],
"latitude" : i[12],
"longitude" : i[13],
"geohash" : i[14],
"localQq" : i[15],
"localTel" : i[16],
"macCode" : i[17],
"networkType" : i[18],
"phoneMacAddress" : i[19],
"source" : i[20],
"waterNum" : i[21],
"wifiMacAddress" : i[22],
"wifiName" : i[23],
"ipAddress" : i[24],
"createTime" : c_time,
"actionTime" : a_time
}
}
actions.append(action)
while True:
try:
helpers.bulk(es_client, actions)
writef = 0
except Exception as e:
print e
writef = 1
if writef == 0:
break
if e_day == stop_day:
break
count += 1
mysql2es()
|