IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Mongodb数据导入集群hive数仓 -> 正文阅读

[大数据]Mongodb数据导入集群hive数仓

hive方式映射数据

?官方文档:mongo-hadoop官方文档

  1. 组件版本要求:
    • Hadoop 1.X版本必须是1.2及以上版本
    • Hadoop 2.X版本必须是2.4及以上版本
    • Hive版本必须是1.1及以上版本
    • 依赖的mongodb java dirver 版本必须是3.0.0及以上版本
  2. 依赖的jar包下载地址,根据需求选择不同版本:
  3. hive引入依赖jar包,以我的版本为例,在hiveCli窗口运行以下命令:
   ADD JAR /path-to/mongo-hadoop-core-2.0.2.jar;
   ADD JAR /path-to/mongo-hadoop-hive-2.0.2.jar;
   ADD JAR /path-to/mongo-java-driver-3.9.1.jar;

使用list jars命令查看jar包是否加载成功,如下。
在这里插入图片描述4. 创建hive与mongodb表映射关系,模板如下:

CREATE [EXTERNAL] TABLE <tablename>
(<schema>)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
[WITH SERDEPROPERTIES('mongo.columns.mapping'='<JSON mapping>')]
TBLPROPERTIES('mongo.uri'='<MongoURI>');CREATE [EXTERNAL] TABLE <tablename>
(<schema>)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
[WITH SERDEPROPERTIES('mongo.columns.mapping'='<JSON mapping>')]
TBLPROPERTIES('mongo.properties.path'='HiveTable.properties');
  • 使用mongo.columns.mapping选项指定对应关系,将 Hive列和Hive字段类型映射到MongoDB相应的表字段和类型。例如’mongo.columns.mapping’=‘{“id”:“_id”,“work.title”:“job.position”,“class”:“class”}’,mapping左边对于hive的字段名,右边对应mongodb字段名,如hive字段和mongodb字段相同且符合hive命名规则,则可以不写。
  • 建议创建表时使用EXTERNAL外部表的形式,因为hive表被删除时,只删除元数据,而底层的mongodb原数据保持不变;如果创建的是内部表,则删除hive表的同时会删除与hive表关联的元数据和底层的mongodb原数据。为防止误删等危险操作导致数据丢失,建议使用外部表。
  • 参数mongo.uri为MongoDB的连接字符串,会将uri连接串存储在表中。如果你的连接串中包含凭据(例如用户名:密码)作为连接字符串的一部分进行传递时,建议使用参数mongo.properties.path指定定连接字符串在属性文件路径。
  • 标准的mongo.uri为:mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]],详细了解可查看官网mongodb官网
  • 对于mongodb的如果用户名或密码包含以下字符:: / ? # [ ] @时,这些字符必须使用转换
    百分比编码,提供转换工具,其实官网也有对这种情况作出说明。
    在这里插入图片描述5. 实例演示:
  • mongodb数据库数据和字段
    在这里插入图片描述
  • 进入hiveCli窗口,执行以下命令:
add jar hdfs:///user/hdfs/mongoJar/mongo-hadoop-core-2.0.2.jar;
add jar hdfs:///user/hdfs/mongoJar/mongo-hadoop-hive-2.0.2.jar;
add jar hdfs:///user/hdfs/mongoJar/mongo-java-driver-3.9.1.jar;

create external table if not exists runoob(
id STRING,
x STRING,
y STRING
)
stored by 'com.mongodb.hadoop.hive.MongoStorageHandler'
with serdeproperties('mongo.columns.mapping'='{"id":"_id"}')
tblproperties('mongo.uri'='mongodb://172.18.20.3:27017/test.runoob');

select * from runoob;

执行结果如下,和mongodb数据库中的数据保持一致:
在这里插入图片描述

spark方式映射数据

官方文档:MongoDB Connector for Spark

一. 读取mongodb数据
spark读取mongodb数据的方式有java、scala和python等多种方式,可通过官网自行学习,本文仅介绍python方式。
python方式需要使用pyspark 或者 spark-submit的方式进行提交。

  1. pyspark启动的方式,使用pyspark启动命令行:
# 根据自己的spark版本和scala版本选择,本地安装的spark版本为2.3.0,如果是其他版本需要修改版本号和scala的版本号
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

在这里插入图片描述

  • 在pyspark shell脚本输入如下代码:
spark = SparkSession \
        .builder \
        .appName('mongodb') \
        .config('spark.mongodb.input.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .enableHiveSupport() \
        .getOrCreate()

df = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()

df.createOrReplaceTempView('runoob')

resDf = spark.sql('select * from runoob')

resDf.show()

spark.stop()

exit(0)
  • 输出的结果和mongodb数据保持一致:
    在这里插入图片描述
  1. spark-submit的方式启动
  • 编写read_mongo.py脚本,脚本内容如下:
#!/usr/bin/python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName('mongodb') \
        .config('spark.mongodb.input.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .enableHiveSupport() \
        .getOrCreate()
        
    df = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()
    
    df.createOrReplaceTempView('runoob')
    
    resDf = spark.sql('select * from runoob')
    
    resDf.show()
    
    spark.stop()
    
exit(0)

  • spark-submit的方式提交命令,这里采用nohup提交方式,结果输出在log文件中
spark-submit \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 \
--num-executors 1 \
--executor-memory 512M \
--executor-cores 1 \
--master yarn \
--name read_mongo \
--conf spark.port.maxRetries=1000 \
read_mongo.py  >> ./read_mongo.log &

二. 使用Schema约束读取mongo数据

  1. 采用pyspark的方式,在命令行中编写如下代码:
# 导入包
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession \
        .builder \
        .appName('mongodb') \
        .config('spark.mongodb.input.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .enableHiveSupport() \
        .getOrCreate()

# 如果mongodb中的json字段太多,我们也可以通过schema限制,过滤掉不要的数据
# name 设置为StringType
# age 设置为IntegerType

schema = StructType([
    StructField("x", IntegerType()),
    StructField("y", IntegerType())
])

df = spark.read.format('com.mongodb.spark.sql.DefaultSource').schema(schema).load()

df.createOrReplaceTempView('runoob')

resDf = spark.sql('select * from runoob')

resDf.show()

spark.stop()

exit(0)

  • 输出的结果和mongodb数据保持一致:
    在这里插入图片描述

三. 写入mongodb数据

  1. 采用pyspark的方式,在pyspark shell中编写如下代码:
# 导入包
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession \
        .builder \
        .appName('mongodb') \
        .config('spark.mongodb.output.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .enableHiveSupport() \
        .getOrCreate()

schema = StructType([
    StructField("x", IntegerType()),
    StructField("y", IntegerType())
])
df = spark.createDataFrame([(10, 20), (20, 30), (1, 2)], schema)

df.show()

df.write.format('com.mongodb.spark.sql.DefaultSource').mode("append").save()

spark.stop()

exit(0)

在这里插入图片描述

  • 插入后的mongodb数据:
    在这里插入图片描述
  1. spark-submit的方式启动
  • 编写write_mongo.py脚本,脚本内容如下:
#!/usr/bin/python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName('mongodb') \
        .config('spark.mongodb.output.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .enableHiveSupport() \
        .getOrCreate()
        
	schema = StructType([
	    StructField("x", IntegerType()),
	    StructField("y", IntegerType())
	])
	df = spark.createDataFrame([(10, 20), (20, 30), (1, 2)], schema)
	
	df.show()
	
	df.write.format('com.mongodb.spark.sql.DefaultSource').mode("append").save()
    
    spark.stop()
    
exit(0)

  • spark-submit的方式提交命令,这里采用nohup提交方式,结果输出在log文件中
spark-submit \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 \
--num-executors 1 \
--executor-memory 512M \
--executor-cores 1 \
--master yarn \
--name read_mongo \
--conf spark.port.maxRetries=1000 \
write_mongo.py  >> ./write_mongo.log &

四. 这里补充一个知识点

  • 使用pyspark启动命令行:
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

上面使用 —package的方式可以直接在代码中通过配置写入,另外,读取和写入可以在一个SparkSession 中进行操作,如下:

spark = SparkSession \
        .builder \
        .appName('mongodb') \
        .config('spark.mongodb.input.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .config('spark.mongodb.output.uri', 'mongodb://172.18.20.3:27017/test.runoob') \
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.0') \
        .enableHiveSupport() \
        .getOrCreate()

这样就可以直接在idea中进行脚本运行和断点测试了,但注意本地pyspark的版本,因我本地idea安装的pyspark版本为3.1.2,所以配置的jar包为org.mongodb.spark#mongo-spark-connector_2.12;3.0.2,运行结果如下:
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-20 18:56:33  更:2022-07-20 18:58:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 23:26:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码