IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【无标题】 -> 正文阅读

[大数据]【无标题】

    <dependency>
        <groupId>c3p0</groupId>
        <artifactId>c3p0</artifactId>
        <version>0.9.1.2</version>

    </dependency>

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.39</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <!-- mysql -->
    <dependency>
        <groupId>org.scalikejdbc</groupId>
        <artifactId>scalikejdbc_2.11</artifactId>
        <version>3.1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc-config -->
    <dependency>
        <groupId>org.scalikejdbc</groupId>
        <artifactId>scalikejdbc-config_2.11</artifactId>
        <version>3.1.0</version>
    </dependency>


</dependencies>



<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

package output

import java.sql.DriverManager

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**

  • 核心算子:foreachRDD
    */
    object WordCountForeachRDD {
    def main(args: Array[String]) {
    //做单词计数
    val sparkConf = new SparkConf().setAppName(“WordCountForeachRDD”).setMaster(“local[2]”)
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val lines = ssc.socketTextStream(“localhost”, 8888)
    val words = lines.flatMap(.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(
    + _)

    //将结果保存到Mysql(一) 这句代码是不能运行的。
    wordCounts.foreachRDD { (rdd, time) =>
    //创建了数据库连接
    //executed at the driver
    Class.forName(“com.mysql.jdbc.Driver”)
    val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
    //statement 要从Driver通过网络发送过来
    //序列化的事,statement不支持序列化。
    //connection object not serializable)
    rdd.foreach { record =>
    //executed at the worker(Executor)
    //遍历每一条数据,然后把数据插入数据库。
    statement.setLong(1, time.milliseconds)
    statement.setString(2, record._1)
    statement.setInt(3, record._2)
    statement.execute()
    }
    statement.close()
    conn.close()
    }
    //启动Streaming处理流
    ssc.start()

    ssc.stop(false)

    //将结果保存到Mysql(二) 可以的。
    wordCounts.foreachRDD { (rdd, time) =>
    //driver

    rdd.foreach { record =>
    //为每一条数据都创建了一个连接。
    //连接使用完了以后就关闭。
    //频繁的创建和关闭连接。其实对数据库性能影响很大。
    //executor,worker
    Class.forName(“com.mysql.jdbc.Driver”)
    val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
    statement.setLong(1, time.milliseconds)
    statement.setString(2, record._1)
    statement.setInt(3, record._2)
    statement.execute()
    statement.close()
    conn.close()
    }
    }

    //将结果保存到Mysql(三)
    wordCounts.foreachRDD { (rdd, time) =>

    rdd.foreachPartition { partitionRecords =>
    //executor,Worker
    //为每个partition的数据创建一个连接。
    //比如这个partition里面有1000条数据,那么这1000条数据
    //就共用一个连接。这样子的话,连接数就减少了1000倍了。
    Class.forName(“com.mysql.jdbc.Driver”)
    val conn = DriverManager.getConnection(“jdbc:mysql://hadoop1:3306/test”, “root”, “root”)
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")

    partitionRecords.foreach { case (word, count) =>
      statement.setLong(1, time.milliseconds)
      statement.setString(2, word)
      statement.setInt(3, count)
      statement.execute()
    }
    statement.close()
    conn.close()
    

    }
    }

    //将结果保存到Mysql(四)
    wordCounts.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionRecords =>
    //使用连接池,我们连接就可以复用
    //性能就更好了。
    val conn = ConnectionPool.getConnection
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")

    partitionRecords.foreach { case (word, count) =>
      //缺点: 还是一条数据一条数据插入
      //每条数据插入都需要跟MySQL进行通信。
      statement.setLong(1, time.milliseconds)
      statement.setString(2, word)
      statement.setInt(3, count)
      statement.execute()
    }
    statement.close()
    //使用完了以后,把连接还回去
    ConnectionPool.returnConnection(conn)
    

    }
    }

    //将结果保存到Mysql(五)
    wordCounts.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionRecords =>
    val conn = ConnectionPool.getConnection
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
    partitionRecords.foreach { case (word, count) =>
    //使用了批处理。性能就更好了。
    statement.setLong(1, time.milliseconds)
    statement.setString(2, word)
    statement.setInt(3, count)
    statement.addBatch()
    }
    statement.executeBatch()
    statement.close()
    ConnectionPool.returnConnection(conn)
    }
    }

    //将结果保存到Mysql(六)
    wordCounts.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionRecords =>
    val conn = ConnectionPool.getConnection
    //自动提交的事务关闭
    conn.setAutoCommit(false)
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
    partitionRecords.foreach { case (word, count) =>
    statement.setLong(1, time.milliseconds)
    statement.setString(2, word)
    statement.setInt(3, count)
    statement.addBatch()
    }
    statement.executeBatch()
    statement.close()
    //提交了一个批次以后,我们手动提交事务。
    conn.commit()
    conn.setAutoCommit(true)
    ConnectionPool.returnConnection(conn)
    }
    }

    //将结果保存到Mysql(七)
    wordCounts.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionRecords =>
    val conn = ConnectionPool.getConnection
    conn.setAutoCommit(false)
    //500 mysql
    val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
    partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
    statement.setLong(1, time.milliseconds)
    statement.setString(2, word)
    statement.setInt(3, count)
    statement.addBatch()
    //批处理的时候,我们可以决定多少条数据为一个批次
    //我们这儿设置的是500条。
    if (index != 0 && index % 500 == 0) {
    statement.executeBatch()
    conn.commit()
    }
    }
    statement.executeBatch()
    statement.close()
    conn.commit()
    conn.setAutoCommit(true)
    ConnectionPool.returnConnection(conn)
    }
    }

    //等待Streaming程序终止
    ssc.awaitTermination()
    }

}
package output;

import com.mchange.v2.c3p0.ComboPooledDataSource;

import java.sql.Connection;
import java.sql.SQLException;

public class ConnectionPool {
private static ComboPooledDataSource dataSource = new ComboPooledDataSource();
static {
dataSource.setJdbcUrl(“jdbc:mysql://hadoop:3306/test”);//设置连接数据库的URL

    dataSource.setUser("root");//设置连接数据库的用户名

    dataSource.setPassword("root");//设置连接数据库的密码

    dataSource.setMaxPoolSize(40);//设置连接池的最大连接数

    dataSource.setMinPoolSize(2);//设置连接池的最小连接数

    dataSource.setInitialPoolSize(10);//设置连接池的初始连接数

    dataSource.setMaxStatements(100);//设置连接池的缓存Statement的最大数
}

public static Connection getConnection() {
    try {
        return dataSource.getConnection();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

public static void returnConnection(Connection connection) {
    if (connection != null) {
        try {
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-08 22:34:18  更:2022-03-08 22:38:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 19:54:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码