前言
最近项目需要用Scala读取Hive的Table,但是Spark和Hive的API比较重量级,性能不够。于是调研,发现了一个更加底层的框架Eel:
示例
为了便于理解,先从读取简单的CSV开始
Local
CSV File
我自己新建了一个CSV文件
school_number,student_name
1,Bernoulli
2,Newton
3,Leibniz
sbt导入依赖libraryDependencies += "io.eels" % "eel-core_2.12" % "1.2.4" 后,读取的代码
import io.eels.{DataTypeRule, SchemaInferrer}
import io.eels.component.csv.CsvSource
import io.eels.schema.{IntType, StringType}
import java.nio.file.Paths
object CsvReaderDemo {
def main(args: Array[String]): Unit = {
val csvFilePath = Paths.get("./datas/students.csv")
CsvSource(csvFilePath).toDataStream()
.schema.fields.foreach(f => println(f))
val filedTypeMapping = Seq(
DataTypeRule("school_number", IntType.Signed),
DataTypeRule("student_name", StringType))
val schemaInferrer = SchemaInferrer(
StringType,
filedTypeMapping)
CsvSource(csvFilePath).withSchemaInferrer(schemaInferrer)
.toDataStream()
.filter(_.values(0).toString.toInt > 0)
.filter(_.get("student_name").equals("Leibniz"))
.collect
.foreach(row => println(row))
}
}
打印的结果如下
Hive
需要创建HMS的客户端
val hiveConf = new HiveConf()
hiveConf.set(ConfVars.METASTOREURIS.varname, "thrift://localhost:9083")
implicit val hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf)
Parquet
准备数据
CREATE EXTERNAL TABLE IF NOT EXISTS `eel_test.person` (
`NAME` string,
`AGE` int,
`SALARY` decimal(38,5),
`CREATION_TIME` timestamp)
PARTITIONED BY (`title` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/client/eel_test/persons';
INSERT INTO eel_test.person PARTITION (title='Mr') SELECT 'Fred', 50, 50000.99, CURRENT_TIMESTAMP();
INSERT INTO eel_test.person PARTITION (title='Mr') SELECT 'Gary', 50, 20000.34, CURRENT_TIMESTAMP();
INSERT INTO eel_test.person PARTITION (title='Mr') SELECT 'Alice', 50, 99999.98, CURRENT_TIMESTAMP()
读取数据的代码
HiveSource("eel_test", "eel_test")
.toDataStream()
.collect
.foreach(row => println(row))
ORC
准备数据
CREATE EXTERNAL TABLE IF NOT EXISTS eel_test.test_orc(
id int,
name string
)
PARTITIONED BY (create_date string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS ORC
LOCATION '/client/eel_test/test_orc'
INSERT INTO eel_test.test_orc PARTITION(create_date='20210101')
SELECT 1, 'Alice'
UNION
SELECT 2, 'Fred'
UNION
SELECT 3, 'Gary';
读取数据的代码
HiveSource("eel_test", "eel_test")
.toDataStream()
.collect
.foreach(row => println(row))
后记
Eel还能直接读取HDFS,更多例子可以参考github
|