初次接触到大数据相关的信息,还有点不适应,通过不断搜索和尝试解决了Spark读取Hive表时出现的异常,错误如下:
Caused by:java.lang.RuntimeException:org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns:[demo.demo.id,demo.demo.name,demo.demo.birthday];
'Project ['id,'name,'age]
+- SubqueryAlias demo
+-Relation[demo.id#0,demo.name#1,demo.birthday#2]JDBCRelation(demo)[numPartintions=1]
....
出现错误的代码如下:
public static viod main(String[] args){
String url = ""
String username = "";
String password = "";
String driver = "";
String path = "path";
List<String> tables = Arrays.asList("demo");
String sql = “select id,name,birthday from demo”;
SparkSession spark = SparkSession.builder().appName("hiveDb").getOrCreate();
Properties properties = new Properties();
properties.setProperty("user",username);
properties.setProperty("password",password);
for (String table : tables){
Dataset<Row> dataset = sparkSession.read().option("driver",driver).jdbc(url, table, properties);
dataset.createOrReplaceTempView(table);
}
sparkSession.sql(sql).write().option("header","true").mode("Append").csv(path);
}
观察错误,发现列是‘demo.demo.id’,感到懵逼,不知道怎么来的这个东西,所以用spark-shell进行测试,列名称是怎么来的:
scala>spark.read.jdbc(hiveUrl, "demo", connectionProperties).show()
执行以上代码返回结果:
+-------+---------+-------------+
|demo.id|demo.name|demo.birthday|
+-------+---------+-------------+
+-------+---------+-------------+
通过观察打印结果的表头发现,读取hive表的结果列是由表名称和列名称组合而成,而我们的临时视图也是跟表名称一样,所以才会导致以上错误中出现‘demo.demo.id’的列出现。
怎么解决这个问题呢,只需要对以上代码做调整:
public static viod main(String[] args){
String url = ""
String username = "";
String password = "";
String driver = "";
String path = "path";
List<String> tables = Arrays.asList("demo");
String sql = “select id,name,birthday from demo”;
SparkSession spark = SparkSession.builder().appName("hiveDb").getOrCreate();
Properties properties = new Properties();
properties.setProperty("user",username);
properties.setProperty("password",password);
for (String table : tables){
Dataset<Row> dataset = sparkSession.read().option("driver",driver).jdbc(url, table, properties);
if(url.toLowerCase().startsWith("jdbc:hive2")){
for(String column : dataset.columns)){
String[] arr = column.split("\\.");
dataset = dataset.withColumnRenamed(column, split[1]);
}
}
dataset.createOrReplaceTempView(table);
}
sparkSession.sql(sql).write().option("header","true").mode("Append").csv(path);
}
|