IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Datax 插件二次开发之parquet日志问题处理 -> 正文阅读

[大数据]Datax 插件二次开发之parquet日志问题处理

Datax 插件二次开发之parquet日志问题处理

Date: December 31, 2021

参考文档:

https://blog.csdn.net/wuleidaren/article/details/106395549

https://cloud.tencent.com/developer/ask/123020

1.问题背景

1.1 发现问题

前面对Datax进行了插件开发,能够正常使用,但是发现一个问题,每次执行时,在datax 的日志中会出现很多parquet的无效日志,类似下边:

在这里插入图片描述

这样,在执行时非常影响日志查看,有效信息被无效信息遮蔽。

1.2 确定问题

发现问题,就要寻找解决办法。

通过查询资料,发现很多仁兄都遇到过类似问题,但是解决方式要么没有,要么含含糊糊。

于是,就自己去看parquet源码,发现parquet项目本身存在日志的bug,里边的日志类用很老的java.util.logging.Logger而不是 apache log4j或者slfj 这种日志框架,而且里边出现了硬编码,导致出现很多日志无法被logback日志框架屏蔽。

parquet.hadoop包中日志打印:

在这里插入图片描述

parquet.hadoop中的日志类Log,实际使用的:

在这里插入图片描述

因为两个使用的日志类没有关系,不能根据日志等级进行日志屏蔽。

2.问题解决

2.1 寻找类似案例

不过查找,在这个仁兄的博客中看到一些希望:https://blog.csdn.net/wuleidaren/article/details/106395549
对方法二看了下,发现这个想法太不切实际。然后方法三又没有实际的案例,让人不知如下下手。

最后,黄天不负有心人,在腾讯云的这个博客:https://cloud.tencent.com/developer/ask/123020
看到有仁兄在spark上遇到类似的问题。一思考便知,实际两者的问题本质相同,只是出现在不同的地方,因此多次调整和尝试,最终还是解决了这个问题。

2.2 解决方案

在datax中配置控制台日志输出,通过类冲突,限制parquet无效日志的数据,经测试可行。

2.2.1 具体操作

1>创建文件
parquet-logging.properties ,存放到${DATAX_HOME}/conf 目录下,文件内容如下:

# Note: I'm certain not every line here is necessary. I just added them to cover all possible
# class/facility names.you will want to tailor this as per your needs.
.level=WARNING
java.util.logging.ConsoleHandler.level=WARNING

parquet.handlers=java.util.logging.ConsoleHandler
parquet.hadoop.handlers=java.util.logging.ConsoleHandler
org.apache.parquet.handlers=java.util.logging.ConsoleHandler
org.apache.parquet.hadoop.handlers=java.util.logging.ConsoleHandler

parquet.level=WARNING
parquet.hadoop.level=WARNING
org.apache.parquet.level=WARNING
org.apache.parquet.hadoop.level=WARNING

2>修改datax.py 执行脚本

isWindows 函数中添加parquet日志类的相关配置:

PAR_LOG_CONF_FILE = ("%s/conf/parquet-logging.properties" ) % (DATAX_HOME)

并修改DEFAULT_PROPERTY_CONF 变量:

DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener  -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s -Djava.util.logging.config.file=%s" % (
    DATAX_HOME, LOGBACK_FILE, PAR_LOG_CONF_FILE)

3>进行parquet任务测试

执行datax 的hdfsreader (支持parquet)或者 hdfsparquetwriter 的任务,例如:

python  ${DATAX_HOME}/bin/datax.py  test_hdfswriter_parquet.job  

查看执行日志:

在这里插入图片描述

发现已经没有parquet日志了。

4>再次确认方法可行

在datax的日志中,仔细看,能看到一些关于parquet类冲突的日志,如下:

在这里插入图片描述

说明,在使用java.util.logging.ConsoleHandler 作为parquet 日志处理类后,由于出现两个binding,datax种选择了后者【shaded.parquet.org.slf4j.helpers.NOPLoggerFactory】,从而屏蔽了warning以下级别的日志。

2.2.2 总结

自此,parquet日志问题得以解决。我们在解决相关问题的时候,要学会变通,不能一条路堵死,就走不下去了。

就像这里,datax屏蔽日志没有案例,我们可以找其他场景的类似案例;直接屏蔽日志屏蔽不了,就考虑使用日志绑定的方式,不适用parquet自带的日志类,从而曲线救国达到同样效果。

最后,附上 datax.py 的内容:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import codecs
import json
import os
import platform
import re
import signal
import socket
import subprocess
import sys
import time
from optparse import OptionGroup
from optparse import OptionParser
from string import Template

ispy2 = sys.version_info.major == 2

def isWindows():
    return platform.system() == 'Windows'

DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
PAR_LOG_CONF_FILE = ("%s/conf/parquet-logging.properties" ) % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener  -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s -Djava.util.logging.config.file=%s" % (
    DATAX_HOME, LOGBACK_FILE, PAR_LOG_CONF_FILE)

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

RET_STATE = {
    "KILL": 143,
    "FAIL": -1,
    "OK": 0,
    "RUN": 1,
    "RETRY": 2
}

def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"

def suicide(signum, e):
    global child_process
    if ispy2:
        print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)
    else:
        print("[Error] DataX receive unexpected signal %d, starts to suicide." % (signum), sys.stderr)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    if ispy2:
        print >> sys.stderr, "DataX Process was killed ! you did ?"
    else:
        print("DataX Process was killed ! you did ?", sys.stderr)
    sys.exit(RET_STATE["KILL"])

def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)

def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader", type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer", type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (
        reader, reader, reader)
    writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (
        writer, writer, writer)
    print(readerRef)
    print(writerRef)
    jobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print(jobGuid)
    jobTemplate = {
        "job": {
            "setting": {
                "speed": {
                    "channel": ""
                }
            },
            "content": [
                {
                    "reader": {},
                    "writer": {}
                }
            ]
        }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer)
    try:
        readerPar = readPluginTemplate(readerTemplatePath)
    except:
        print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath))
    try:
        writerPar = readPluginTemplate(writerTemplatePath)
    except:
        print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath))
    jobTemplate['job']['content'][0]['reader'] = readerPar
    jobTemplate['job']['content'][0]['writer'] = writerPar
    print(json.dumps(jobTemplate, indent=4, sort_keys=True))

def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
        return json.load(f)

def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False

def buildStartCommand(options, args):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print('local ip: ', getLocalIp())

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)

def printCopyright():
    print('''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

''' % DATAX_VERSION)
    sys.stdout.flush()

if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader, options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

如果想下载文件,或者Datax 的 hdfsreader(支持parquet读) 和 hdfsparquetwriter (支持parquet写)相关插件,请关注我的代码仓库:https://gitee.com/jackielee4cn/DataX.git

欢迎批评指正或者Star 。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-29 23:09:15  更:2022-01-29 23:11:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:28:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码