当前spark版本:2.4.6
1. 声明
当前内容主要为本人学习Spark的sql执行操作,实现数据获取和数据入库,当前内容参考:Spark官方文档
2. pom依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.6</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>spark-iotdb-connector</artifactId>
<version>0.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
这里注意:为2.11不是2.12,如果使用2.12就会报错:Caused by: java.lang.ClassNotFoundException: scala.Product$class
3. demo
1. 首先开启iotdb(这里使用0.11.2版本),然后再开启mysql 2.在mysql中创建一个数据库
3. 开始编写代码
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple1;
public class SparkOperationIotdbTest {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Java App Test").master("local")
.getOrCreate();
Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://localhost:6667/")
.option("user", "root")
.option("password", "root")
.option("sql", "select last * from root.test").load();
df.printSchema();
df.show();
df.write().format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/spark_iotdb_to_mysql?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8")
.option("user", "root")
.option("password", "root")
.option("dbtable", "iotdb_data_table")
.save();
}
}
这里使用local表示使用本地执行方式,spark.read().load()表示读的操作,spark.write().save()表示写的操作,只需要填写好库即可,spark会自动创建表和需要的字段
注意:如果该表已存在,那么就会执行报错!
执行成功的结果为:
测试成功!
|