我的原创地址:https://dongkelun.com/2021/02/19/javaSparkThriftServer/
前言
总结Spark Thrift Server、Hive Server以及如何用Java连接
启动
hive server
hiveserver2
或
hive --service hiveserver2
默认端口是1000
spark thrift server
修改hive.server2.transport.mode为http(默认值为binary(TCP),可选值HTTP) 将hive-site.xml拷贝到spark conf 目录下,并添加
<property>
<name>hive.server2.transport.mode</name>
<value>http</value>
</property>
启动命令($SPARK_HOME/sbin)
start-thriftserver.sh --端口默认值为10001
或
start-thriftserver.sh --hiveconf hive.server2.thrift.http.port=10002 --参数修改端口号
或
spark-submit --master yarn --deploy-mode client --executor-memory 2G --num-executors 25 --executor-cores 2 --driver-memory 16G --driver-cores 2 --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --name Thrift JDBC/ODBC Server spark-internal --hiveconf hive.server2.thrift.http.port=10003 --start-thriftserver.sh实际上也是调用的类org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
当hive.server2.transport.mode为http时,默认端口为10001,通过–hiveconf hive.server2.thrift.http.port修改端口号,当然hive.server2.transport.mode为默认值TCP 时,默认端口为10000,通过–hiveconf hive.server2.thrift.port修改端口号, 也就是默认端口号和是否为hive或者spark无关,这里为啥spark不选默认值,因为当为默认值时,虽然也能正常使用,但是spark server日志里会有异常,原因未知,待研究
Java 代码
pom 依赖
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.3.7</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
</dependencies>
更新:2021-05-07 这里提一下依赖的版本问题,上面写的版本是我自己搭建的开源的hive和hadoop,所以版本可以很清楚的知道是多少,并且和spark版本是适配的。后来在连接hdp对应的hive和spark时,在版本对应关系上出现了问题,这里总结一下。首先提一下在连接Spark Thrift Server时,对版本适配要求比较高,而hive server对依赖的版本适配较低。 总结一下hdp如何对应版本,在ambari界面添加服务即可看到各个组件包括hive对应的版本信息,或者在命令行看一下jar包,比如hive-jdbc-3.1.0.3.1.0.0-78.jar,则代表hive本本为3.1.0,后面的是hdp的版本号,这样配置依赖连接Hive Server是没有问题的,而在连接Spark Server时发现了问题,报了版本不匹配的异常,比如spark/jars下的jar包为hive-jdbc-1.21.2.3.1.0.0-78.jar,那么hive-jdbc的版本应该为1.21.2可实际上没有这个版本的依赖,且即使用上面的3.1.0版本去连接Spark Sever一样版本不匹配,那么这种情况下该如何确定hive-jdbc的版本的?我用的是下面的方法: 首先确认Spark的版本为2.4.,然后我去github上查找对应的版本的spark源码的依赖,发现hive的版本号为1.2.1, hadoop的版本号为2.6.5,那么最终hive-jdbc:1.2.1,hadoop-common:2.6.5,这样配置依赖就可以连接hdp下的spark thrift server了,且该版本的hive-jdbc一样可以连接hive server即上面说的hive server对依赖的版本适配较低。最后提一下,当hive-jdbc版本为3.1.0即3..*时,不用再另外添加hadoop-common的依赖即可连接hive server,因为hive-jdbc的包里已经包含了对应的依赖,即使同时添加也会依赖冲突的。 附:版本不匹配时的异常信息:
14:32:23.971 [main] ERROR org.apache.hive.jdbc.HiveConnection - Error opening session
org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{set:hiveconf:hive.server2.thrift.resultset.default.fetch.size=1000, use:database=sjtt})
at org.apache.thrift.TApplicationException.read(TApplicationException.java:111) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.hive.service.rpc.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:168) ~[hive-service-rpc-2.3.7.jar:2.3.7]
at org.apache.hive.service.rpc.thrift.TCLIService$Client.OpenSession(TCLIService.java:155) ~[hive-service-rpc-2.3.7.jar:2.3.7]
at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:680) [hive-jdbc-2.3.7.jar:2.3.7]
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:200) [hive-jdbc-2.3.7.jar:2.3.7]
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107) [hive-jdbc-2.3.7.jar:2.3.7]
at java.sql.DriverManager.getConnection(DriverManager.java:664) [?:1.8.0_161]
at java.sql.DriverManager.getConnection(DriverManager.java:270) [?:1.8.0_161]
at com.dkl.blog.SparkThriftServerDemoWithKerberos.jdbcDemo(SparkThriftServerDemoWithKerberos.java:41) [classes/:?]
at com.dkl.blog.SparkThriftServerDemoWithKerberos.main(SparkThriftServerDemoWithKerberos.java:35) [classes/:?]
代码
package com.dkl.blog;
import java.sql.*;
public class SparkThriftServerDemo {
private static String HIVE_JDBC_URL = "jdbc:hive2://192.168.44.128:10000/default";
private static final String SPARK_JDBC_URL = "jdbc:hive2://192.168.44.128:10001/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice";
public static void main(String[] args) throws SQLException {
System.out.println("select from hive");
jdbcDemo(HIVE_JDBC_URL);
System.out.println("select from spark thrift server");
jdbcDemo(SPARK_JDBC_URL);
}
public static void jdbcDemo(String jdbc_url) throws SQLException {
Connection connection = null;
try {
connection = DriverManager.getConnection(jdbc_url);
selectTable(connection);
} catch (SQLException e) {
e.printStackTrace();
} finally {
connection.close();
}
}
public static void selectTable(Connection connection) {
String sql = "select * from test limit 10";
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery(sql);
System.out.println("=====================================");
while (rs.next()) {
System.out.println(rs.getString(1) + "," + rs.getString(2));
}
System.out.println("=====================================");
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(stmt);
close(rs);
}
}
private static void close(Statement stmt) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private static void close(ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
代码已上传到 github
|