IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【数据同步】读出 MySQL 数据,直接写入 ES -> 正文阅读

[大数据]【数据同步】读出 MySQL 数据,直接写入 ES

使用:

python rcdasebase_moe.py data_collection_base_00_db t_base_info_0
python rcdasebase_moe.py data_collection_base_00_db t_base_info_1
... ...

python rcdasebase_moe.py data_collection_base_99_db t_base_info_8
python rcdasebase_moe.py data_collection_base_99_db t_base_info_9

代码:

#!/bin/env python

import MySQLdb,MySQLdb.cursors
import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import sys

# start_time = datetime.datetime.strptime('2021-01-01', '%Y-%m-%d')
# stop_time = datetime.datetime.strptime('2021-07-01', '%Y-%m-%d')
# stop_time = datetime.datetime.strptime('2021-01-01', '%Y-%m-%d')
start_day = '2021-07-01'
stop_day = '2021-10-01'


# db_name = 'data_collection_base_99_db'
# table_name = 't_base_info_0'
db_name = sys.argv[1]
table_name = sys.argv[2]
index_name = 'af_collect_data_2021q3_new'

if not db_name.startswith('data_collection_base'):
    sys.exit(1)
if not table_name.startswith('t_base_info'):
    sys.exit(1)


def mysql2es():
    """
    读出 MySQL 数据,直接写入 ES
    """
    conn = MySQLdb.connect(host='MySQL_IP', port=MySQL_PORT, user='MySQL_USER', passwd='MySQL_PWD', charset='utf8')
    cur = conn.cursor()
    es_client = Elasticsearch(["ES_IP_1", "ES_IP_2"], port=ES_PORT, timeout=180)

    count = 0
    while True:
    
        f_day = (datetime.datetime.strptime(start_day, '%Y-%m-%d') + datetime.timedelta(days=count)).strftime("%Y-%m-%d")
        e_day = (datetime.datetime.strptime(start_day, '%Y-%m-%d') + datetime.timedelta(days=count+1)).strftime("%Y-%m-%d")
        print db_name, table_name, f_day

        sql = """
            select 
                concat(fuid, '_', Fscene_type,  '_', Fchannel_id , '_',  unix_timestamp(Fcreate_time)*1000 )  as id,
                Fuid as uid,
                Forder_id as orderId,
                Fscene_type as sceneType,
                Fchannel_id as channelId,
                Fchannel_name as channelName,
                Fapp_system as appSystem,
                Fdevice_name as deviceName,
                Fequipment_model as equipmentModel,
                Fidfa as idfa,
                Fimei as imei,
                Fis_agent as isAgent,
                Flatitude as latitude,
                Flongitude as longitude,
                Fgeohash as geohash,
                Flocal_qq as localQq,
                Flocal_tel as localTel,
                Fmac_code as macCode,
                Fnetwork_type as networkType,
                Fphone_mac_address as phoneMacAddress,
                Fsource as source,
                Fwater_num as waterNum,
                Fwifi_mac_address as wifiMacAddress,
                Fwifi_name as wifiName,
                Fip_address as ipAddress,
                Fcreate_time as createTime,
                Fcreate_time as actionTime
            from %s.%s 
            where Fmodify_time >='%s 00:00:00' and Fmodify_time < '%s 00:00:00';""" % (db_name, table_name, f_day, e_day)
        cur.execute(sql)
        result = cur.fetchall()
        
        # 上面是把数据从 MySQL 读出,存到 result 中。下面把 result 写到 ES 中。
        actions = []
        for i in result:
            try:
                c_time = i[25].strftime('%Y-%m-%d %H:%M:%S')
                a_time = i[26].strftime('%Y-%m-%d %H:%M:%S')
            except Exception as e:
                print e
                print i
                continue

        action =  {
            "_index" : index_name,
            "_type" : "collect_data_base_info",
            "_id" : i[0],
            "_source" : {
                      "uid" : i[1],
                      "orderId" : i[2],
                      "sceneType" : i[3],
                      "channelId" : i[4],
                      "channelName" : i[5],
                      "appSystem" : i[6],
                      "deviceName" : i[7],
                      "equipmentModel" : i[8],
                      "idfa" : i[9],
                      "imei" : i[10],
                      "isAgent" : i[11],
                      "latitude" : i[12],
                      "longitude" : i[13],
                      "geohash" : i[14],
                      "localQq" : i[15],
                      "localTel" : i[16],
                      "macCode" : i[17],
                      "networkType" : i[18],
                      "phoneMacAddress" : i[19],
                      "source" : i[20],
                      "waterNum" : i[21],
                      "wifiMacAddress" : i[22],
                      "wifiName" : i[23],
                      "ipAddress" : i[24],
                      "createTime" : c_time,
                      "actionTime" : a_time
                    }
        }
        actions.append(action)

        while True:
            try:
                helpers.bulk(es_client, actions)
                writef = 0
            except Exception as e:
                print e
                writef = 1
            if writef == 0:
                break 
        if e_day == stop_day:
            break

        count += 1


mysql2es()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-12 17:36:19  更:2022-03-12 17:40:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:36:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码