IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Datax 往 hdfs 写数据配置 HA 高可用 -> 正文阅读

[大数据]Datax 往 hdfs 写数据配置 HA 高可用

Datax 往 hdfs 写数据配置 HA 高可用

配置脚本

vim gen_import_config_ha.py

内容 :

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "cpu102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xxxxx"

#HDFS HA 相关配置,需根据实际情况作出修改
my_nameservices = "mycluster"
my_namenodes_1 = "nn1"
my_namenodes_2 = "nn2"
my_rpc_address_1 = "cpu101:8020"
my_rpc_address_2 = "cpu102:8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"

#获取mysql连接
def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

#获取mysql表的列名
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

#将获取的元数据中 mysql 的数据类型转换为 hive 的数据类型  写入到 hdfswriter 中
def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

#生成json文件
def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS":"hdfs://" + my_nameservices,
						"hadoopConfig":{
						  "dfs.nameservices": my_nameservices,
						  "dfs.ha.namenodes." + my_nameservices: my_namenodes_1 + "," + my_namenodes_2,
						  "dfs.namenode.rpc-address." + my_nameservices + "." + my_namenodes_1 : my_rpc_address_1,
						  "dfs.namenode.rpc-address." + my_nameservices + "." + my_namenodes_2: my_rpc_address_2,
						  "dfs.client.failover.proxy.provider." + my_nameservices: "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
						},
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

权限 :

chmod 777 gen_import_config_ha.py

配置全表生成脚本

vim gen_import_config_ha.sh
#!/bin/bash

python ~/bin/gen_import_config_ha.py -d gmall -t activity_info
python ~/bin/gen_import_config_ha.py -d gmall -t activity_rule
python ~/bin/gen_import_config_ha.py -d gmall -t base_category1
python ~/bin/gen_import_config_ha.py -d gmall -t base_category2
python ~/bin/gen_import_config_ha.py -d gmall -t base_category3
python ~/bin/gen_import_config_ha.py -d gmall -t base_dic
python ~/bin/gen_import_config_ha.py -d gmall -t base_province
python ~/bin/gen_import_config_ha.py -d gmall -t base_region
python ~/bin/gen_import_config_ha.py -d gmall -t base_trademark
python ~/bin/gen_import_config_ha.py -d gmall -t cart_info
python ~/bin/gen_import_config_ha.py -d gmall -t coupon_info
python ~/bin/gen_import_config_ha.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config_ha.py -d gmall -t sku_info
python ~/bin/gen_import_config_ha.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config_ha.py -d gmall -t spu_info

在这里插入图片描述

权限

chmod 777 gen_import_config_ha.sh

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-30 18:32:03  更:2022-03-30 18:33:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:43:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码