IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 连接 Mongodb 批量读取数据 -> 正文阅读

[大数据]Spark 连接 Mongodb 批量读取数据

Spark 连接 mongodb ,并多次切换集合

方案一:通过 JavaSparkContext 连接 mongodb ,利用 MongoSpark.load() 方法获取集合数据

测试 demo 如下:

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.ReadConfig;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import java.util.ArrayList;
import java.util.HashMap;

public class SparkReadMongodbs {
    public static void main(String[] args) {
        String mongoUrl="mongodb://root:root123456@192.168.1.124:27017,192.168.1.123:27017,192.168.1.125:27017/";
        String database="lhiot";
        String dbCollection="0762a06a97b3628bd00037e6f66c7d16";
        String port = "27017";

        SparkSession.Builder builder =SparkSession.builder().master("local[*]").appName("SparkCalculateRecommend")
                .config("spark.mongodb.input.uri", mongoUrl+database+"."+dbCollection+"?authSource=admin")
                .config("spark.executor.memory", "512mb");

        SparkSession spark = builder.getOrCreate();

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        //使用Spark连接器载入sparkContext,获取RDD对象
        JavaMongoRDD<Document> c1 = MongoSpark.load(jsc);

        ArrayList<String> collections = new ArrayList<>();
        collections.add("00dfaed143dcbb02ae21aaec492d369d");
        collections.add("020a91e9c60fab73d244ba797c485e47");
        collections.add("02a70e55a7ff1a4ebb4dbbeb3e28c137");
        collections.add("0588dee7e8fdde3d95ba250affeab843");
        collections.add("0762a06a97b3628bd00037e6f66c7d16");
        collections.add("0914e6088a799c8cee11df25e11e2534");
        collections.add("0f768fc73fed9752fd87f432e9d77ba6");
        collections.add("1336a41b0bd13e1ca6a86905b9c6fd9d");
        collections.add("1ea1b22693d1bdb592853ec59c4d1fe3");

        HashMap<String, String> readOverrides = new HashMap<>();

        for (String collection : collections) {
            readOverrides.put("collection", collection);
            //读取数据库对应集合数据
            ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);
            //获取该设备集合数据
            JavaMongoRDD<Document> c2 = MongoSpark.load(jsc,readConfig);
            c2.toDF()
                    .select("_id.oid", "deviceCode", "funCode", "deptId", "deptName", "mountId", "mountName", "deviceId",
                            "pointId", "pointName", "pointOrderNum", "value", "pointDisplayName", "unit", "originTime", "createTime")
                    .withColumnRenamed("oid", "id")
                    .filter(new FilterFunction<Row>() {
                        @Override
                        public boolean call(Row value) throws Exception {
                            String originTime = value.getAs("originTime").toString();
                            return originTime.compareTo("2022-01-22 00:00:00")>=0 && originTime.compareTo("2022-01-22 23:59:59")<=0;
                        }
                    })
                    .show();
        }
		jsc.stop();
        spark.stop();
    }
}

该方法,在切换集合时,会产生大量的新增连接,程序结束,所有连接会断开。
但是如果业务需要从大量的集合中读取数据,这个方式就不太合适了,维护大量的连接,spark会消耗大量的内存,同事mongo端也会有很大压力,甚至会导致数据库服务的挂掉。

方案二:通过 JavaSparkContext 连接 mongodb ,利用 sqlContext.read().load() 方法获取集合数据

测试 demo 如下:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

public class sparkReadMongodbWithoutCol {
    public static void main(String[] args) {
        String mongoUrl="mongodb://root:root123456@192.168.1.124:27017,192.168.1.123:27017,192.168.1.125:27017/";
        String database="lhiot";
        String dbCollection="0762a06a97b3628bd00037e6f66c7d16";
        String port = "27017";

        //将options的配置信息存储到一个map里
        Map<String, String> map = new HashMap<String, String>();
//        map.put("uri",mongoUrl);
        map.put("database", database);
//        map.put("collection", dbCollection);

        //连接mongodb服务器
        SparkConf sc = new SparkConf().setMaster("local").setAppName("SparkConnectMongo")
                .set("spark.app.id", "MongoSparkConnectorTour")
                .set("spark.mongodb.input.uri", mongoUrl + "?authSource=admin")
                .set("spark.testing.memory","471859200");
        JavaSparkContext jsc = new JavaSparkContext(sc);
        SQLContext sqlContext = new SQLContext(jsc);

        ArrayList<String> collections = new ArrayList<>();
        collections.add("0762a06a97b3628bd00037e6f66c7d16");
        collections.add("00dfaed143dcbb02ae21aaec492d369d");
        collections.add("020a91e9c60fab73d244ba797c485e47");
        collections.add("02a70e55a7ff1a4ebb4dbbeb3e28c137");
        collections.add("0588dee7e8fdde3d95ba250affeab843");
        collections.add("0762a06a97b3628bd00037e6f66c7d16");
        collections.add("0914e6088a799c8cee11df25e11e2534");
        collections.add("0f768fc73fed9752fd87f432e9d77ba6");
        collections.add("1336a41b0bd13e1ca6a86905b9c6fd9d");
        collections.add("1ea1b22693d1bdb592853ec59c4d1fe3");

        for (String collection : collections) {
            map.put("collection", collection);
            //读取数据库对应集合数据
            Dataset<Row> res = sqlContext.read().format("com.mongodb.spark.sql").options(map).load();
            res.registerTempTable("table");
            sqlContext.sql("select * from table").show();
        }

        jsc.stop();
    }

}

该方案再切换不同集合时,不会产生大量的连接,整个程序只会在mongo端产生2个连接,程序结束,2个连接也会自动断开。
该方案就比较适合需要同时读取大量集合数据的需求场景。

以上只是我的简单测试方案,理解较为浅显,欢迎大佬留言交流,谢谢鉴赏。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-31 12:04:09  更:2022-10-31 12:05:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:54:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码