def exe_command(command, debug=False, note=""):
"""
执行 shell 命令并实时打印输出
:param command: shell 命令
:param debug: 是否处于 debug 模式
:param note: 命令执行提示信息
:return: process, exitcode
"""
print(f"{note}\n{command}\n")
output = ""
if not debug:
process = Popen(command, stdout=PIPE, stderr=STDOUT, shell=True)
with process.stdout:
for line in iter(process.stdout.readline, b''):
print(line.decode().rstrip())
output += line.decode()
exitcode = process.wait()
return process, exitcode, output
def exe_sql_command(sql_command, note="hive sql command", redirect=None, debug=False):
"""
执行 hive sql 命令并转储到文件
:param sql_command: sql 命令
:param note: 命令注释
:param redirect: 重定向文件
:param debug: 是否处于 debug 模式
:return: process, exitcode
"""
command = "hive -e \"%s\"" % sql_command
if redirect:
command += f" > {redirect}"
return exe_command(command, debug)
def exe_sql_file(sql_file, note="hive sql file", redirect=None, debug=False):
"""
执行 hive sql 文件并转储到文件
:param sql_file: sql 文件
:param note: 命令注释
:param redirect: 重定向文件
:param debug: 是否处于 debug 模式
:return: process, exitcode
"""
with open(file=sql_file, mode='r') as fp:
content = fp.read()
print(f"sql_file = {content}")
command = f"hive -f {sql_file}"
if redirect:
command += f" > {redirect}"
return exe_command(command, debug)
def count_rows(table_name, debug=False):
"""
统计入参 table_name 的记录数
:param table_name: 表名
:param debug:
:return: 记录数
"""
cmd = f"hive -e \"select count(1) from {table_name}\""
print(cmd)
if not debug:
process = os.popen(cmd)
output = process.read()
process.close()
return output
def get_cols(table_name):
"""
获取hive字段
:param table_name:
:return:
"""
sql_cmd = f"SET hive.cli.print.header=true;select * from {table_name} limit 0;\" | sed -e \"s/\\t/,/g;s/data\.//g\" | grep -v \"WARN"
process = os.popen(f"hive -e \"{sql_cmd}\"")
output = process.read()
process.close()
return output.replace('\n', '').replace('\r', '')
|