前言
在介绍spark thrift server 需要先介绍一下其与hiverserver2及spark-sql的关系与区别
HiveServer2
Hive提供了一个命令行终端,在安装了Hive的机器上,配置好了元数据信息数据库和指定了Hadoop的配置文件之后输入hive命令,就可以进入到hive的交互式终端,接下来只要编写SQL语句即可,这跟传统RDB数据库提供的终端是类似的。 启动hiveserver2服务后,Hive除了可以通过hive提供的beeline以cli的方式操作hive,还提供了java jdbc方式访问hiveServer2,但本质上都是是通过jdbc接口与hive进行交互。除此外还可以通过java api编写java代码手写mapreduce来进行操作。
Spark SQL JDBC之Spark Thrift JDBCServer
跟hiveserver2的作用一样,Spark Thrift JDBCServer是Spark的一个进程,在启动之后不仅可以通过beeline以cli的方式操作spark,同时也可以通过Java JDBC代码进行连接操作。Spark Thrift JDBCServer在生产上一般也是和Hive整合使用(与hive共用元数据库)。 Spark Thrift JDBCServer的使用是基于下面和个方面的考虑: 1.希望使用SQL进行数据分析; 2.能够通过Java JDBC的方式进行连接; 3.基于内存计算,快速处理数据; 4.可以跟Hive进行整合; 5.可以基于Yarn进行资源的调度;
spark-sql命令
/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver “$@” spark-sql本质上是通过spark-submit调用sparksql的api,每个spark-sql单独启动一个application
区别:
-
hiverserver2是一个java进程,在服务器本地运行,通过jdbc的方式接受客户端的调度,负责语法解析,生成执行计划,最终发送给yarn去调度执行,自己并不负责具体执行。 -
spark thrift server进程本质上是Spark的一个Application,是在yarn上的一个Application应用(如果使用yarn做资源管理)。通过jdbc的方式接受客户端的调度,通过语法解析,生成执行计划,最后在当前application内去执行,而不是单独去启动一个yarn任务执行。这就要求当前的application资源足够并且是弹性的 -
spark-sql是一个session级别应用任务,而非是一个服务
spark-sql 与spark-thrift-server的选择
两者相比较在独立性上使用spark-sql方式更好一些,并且jdbc方式在配置方面稍复杂一些,对spark-thrift-server依赖性更高,出错误的几率更高; 但考虑调用方式方面,jdbc方式在一致性上更好一些,并且要求当前的spark-thrift-server的application资源足够且是弹性的
Spark Thrift Server实例详述
环境配置
spark的动态资源分配开启
spark-defaults.conf或spark2-thrift-sparkconf中添加了如下配置:
spark.shuffle.service.enabled true //启External shuffle Service服务
spark.dynamicAllocation.enabled true //开启动态资源分配
spark.dynamicAllocation.minExecutors 0 //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30 //每个Application最大并发分配的executor数,根据需要配置,可以配置尽量大的值
配置参考
在yarn-site.xml中添加如下配置
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle,mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
分发spark-*-yarn-shuffle.jar
下载spark-2.4.6-bin-hadoop2.7(下载需要的版本) spark-*-yarn-shuffle.jar在spark的yarn目录下
[root@bdpapp01 spark-2.4.6-bin-hadoop2.7]
spark-2.4.6-yarn-shuffle.jar
将spark-2.4.6-yarn-shuffle.jar包分发到每个nodemanager所在的节点分发到hadoop yarn的依赖包目录
scp spark-2.4.6-yarn-shuffle.jar root@bdphdp03:/usr/hdp/3.0.1.0-187/hadoop-yarn/lib 或者 scp spark-2.4.6-yarn-shuffle.jar root@bdphdp03:${HADOOP_HOME}/share/hadoop/yarn/lib
注意: 不同的hadoop发行版,hadoop yarn的安装目录不一样
开启服务:
第一种模式开启spark thrift jdbc
[root@bdpapp01 sbin]
/usr/hdp/3.0.1.0-187/spark2/sbin
[root@bdpapp01 sbin]
查看监听的端口:
[root@bdpapp01 sbin]
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 14742 spark 754u IPv6 1584925544 0t0 TCP *:10016 (LISTEN)
我这里是使用ambari hdp,默认是10016,可以通过【hive.server2.thrift.port】去修改 查看SparkSubmit进程:
[root@bdpapp01 sbin]
3808 HistoryServer
8884 LivyServer
19286 Jps
14742 SparkSubmit
注意: spark thrift server只支持yarn client 模式,不支持cluster模型
第二种模式开启spark thrift jdbc(thrift开启http模式)
另外 spark-Thrift-server 还支持通过HTTP传输发送Thrift RPC消息。使用系统属性或conf/中的hive-site.xml文件开启HTTP模式。 修改【hive.server2.transport.mode】为http(默认值为binary(TCP),可选值HTTP) 将hive-site.xml拷贝到spark conf 目录下,并添加
<property>
<name>hive.server2.transport.mode</name>
<value>http</value>
</property>
启动:
start-thriftserver.sh --端口默认值为10001 或 start-thriftserver.sh --hiveconf hive.server2.thrift.http.port=10002 --参数修改端口号
查看监听的端口:
[root@bdpapp01 bin]
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 25102 spark 754u IPv6 1586345112 0t0 TCP *:documentum (LISTEN)
beeline方式连接
[root@bdpapp01 bin]# ./beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://11.51.196.255:10016 Connecting to jdbc:hive2://11.51.196.255:10016 Enter username for jdbc:hive2://11.51.196.255:10016: spark Enter password for jdbc:hive2://11.51.196.255:10016: ***** 22/03/08 17:17:48 INFO Utils: Supplied authorities: 11.51.196.255:10016 22/03/08 17:17:48 INFO Utils: Resolved authority: 11.51.196.255:10016 22/03/08 17:17:48 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://11.51.196.255:10016 Connected to: Spark SQL (version 2.4.6) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://11.51.196.255:10016>
或者 thrift开启http模式beeline方式连接
[root@bdpapp01 bin]# ./beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://11.51.196.255:10002/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice Connecting to jdbc:hive2://11.51.196.255:10002/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice Enter username for jdbc:hive2://11.51.196.255:10002/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice: spark Enter password for jdbc:hive2://11.51.196.255:10002/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice: ***** 22/03/08 19:39:31 INFO Utils: Supplied authorities: 11.51.196.255:10002 22/03/08 19:39:31 WARN Utils: ***** JDBC param deprecation ***** 22/03/08 19:39:31 WARN Utils: The use of hive.server2.transport.mode is deprecated. 22/03/08 19:39:31 WARN Utils: Please use transportMode like so: jdbc:hive2://:/dbName;transportMode=<transport_mode_value> 22/03/08 19:39:31 WARN Utils: ***** JDBC param deprecation ***** 22/03/08 19:39:31 WARN Utils: The use of hive.server2.thrift.http.path is deprecated. 22/03/08 19:39:31 WARN Utils: Please use httpPath like so: jdbc:hive2://:/dbName;httpPath=<http_path_value> 22/03/08 19:39:31 INFO Utils: Resolved authority: 11.51.196.255:10002 Connected to: Spark SQL (version 2.4.6) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://11.51.196.255:10002/default>
java JDBC方式连接
依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
</dependency>
java代码:
package org.apache.dolphinscheduler.ide;
import org.apache.hive.jdbc.HiveStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class SparkSQLExample {
HiveStatement stmt = null;
public static void main(String[] args) throws Exception {
new SparkSQLExample().sparkexecute();
}
public void sparkexecute() throws Exception {
String url = "jdbc:hive2://11.51.196.255:10016/";
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection(url, "hive", "123456");
stmt = (HiveStatement) conn.createStatement();
HiveLogFetchThread hiveLogFetchThread = new HiveLogFetchThread();
FutureTask<Boolean> hiveLogFutureTask = new FutureTask<>(hiveLogFetchThread);
Thread thread = new Thread(hiveLogFutureTask);
thread.start();
String sql = "select * from db01.t1 order by id";
System.out.println("Running" + sql);
ResultSet res = stmt.executeQuery(sql);
hiveLogFetchThread.setFlag(false);
hiveLogFutureTask.get(30, TimeUnit.SECONDS);
while (res.next()) {
System.out.println("id: " + res.getInt(1) + "\tname: " + res.getString(2) + "\tage: " + res.getString(3));
}
}
class HiveLogFetchThread implements Callable<Boolean> {
private volatile boolean flag = true;
public HiveLogFetchThread() {
}
@Override
public Boolean call() throws Exception {
while (true) {
if (stmt.hasMoreLogs()) {
List<String> logList = stmt.getQueryLog();
for (String str : logList) {
System.out.println("==========:" + logList);
}
}
if (!flag) {
break;
}
}
return true;
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
}
问题与解决
问题一
spark-thrift-server端日志报错:
WARN YarnClientClusterScheduler: Initial job has not accepted anyresources; check your cluster UI to ensure that workers are registeredand have sufficient memory
yarn APP 端口日志报错:
org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist
解决方案: 开启spark_shuffle
yarn nodemanager 不能启动,报错:
org.apache.spark.network.yarn.YarnShuffleService 不存在异常
解决方案: 分发spark-*-yarn-shuffle.jar
参考: Spark Thrift JDBCServer应用场景解析与实战案例 Java 连接 Spark Thrift Server/Hive Server总结 Running the Thrift JDBC/ODBC server
|