github
GitHub - RedisLabs/spark-redis: A connector for Spark that allows reading and writing to/from Redis cluster
大家给个星星o
pom下载
??????https://mvnrepository.com/artifact/com.redislabs/spark-redis
?
代码demo
import org.apache.spark.sql._
import com.redislabs.provider.redis._
object scala_redis {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
/*
注意: 如果报错,两种情况: 1 pom里有其他jedis版本的包冲突了; 2 spark版本和spark-redis的版本不同.
pom: https://mvnrepository.com/artifact/com.redislabs/spark-redis
github: https://github.com/RedisLabs/spark-redis
dataframe demo: https://github.com/RedisLabs/spark-redis/blob/master/doc/dataframe.md
*/
val spark = SparkSession
.builder()
.appName("redis-df")
.master("local[1]")
.config("spark.redis.host", "192.168.9.88")
.config("spark.redis.port", 6379)
.config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries
.getOrCreate()
val personSeq = Seq(Person("John", 30), Person("Peter", 45))
val df = spark.createDataFrame(personSeq)
df.write
.format("org.apache.spark.sql.redis")
.option("table", "person")
.save()
spark.stop()
}
}
redis结果
key中person后随机数类似UUID,主要是为了不让重复.
数据读取
import org.apache.spark.sql._
import com.redislabs.provider.redis._
object scala_redis {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-df")
.master("local[1]")
.config("spark.redis.host", "192.168.9.88")
.config("spark.redis.port", 6379)
.config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries
.getOrCreate()
val df = spark.read
.format("org.apache.spark.sql.redis")
.option("table", "person")
.load()
df.show()
spark.stop()
}
}
返回:
sink支持 append overwrite等模式
测试 append模式
import org.apache.spark.sql._
import com.redislabs.provider.redis._
import org.apache.spark.sql.streaming.OutputMode._
object scala_redis {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-df")
.master("local[1]")
.config("spark.redis.host", "192.168.9.88")
.config("spark.redis.port", 6379)
.config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries
.getOrCreate()
val personSeq = Seq(Person("Thomas", 2),Person("Peter", 100))
val df = spark.createDataFrame(personSeq)
df.write
.format("org.apache.spark.sql.redis")
.option("table", "person")
.mode(SaveMode.Append) // 这里支持spark sink的多个模式
.save()
spark.stop()
}
}
注意: 这里peter相当于写了两次. 那么redis怎么存储以及查询结果是什么?
redis key存储的情况:
可以看出来. 因为前面用了随机数,所以相同name
?
?
api查询结果:
两个peter都在
?
?测试一下overwrite模式
import org.apache.spark.sql._
import com.redislabs.provider.redis._
import org.apache.spark.sql.streaming.OutputMode._
object scala_redis {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-df")
.master("local[1]")
.config("spark.redis.host", "192.168.9.88")
.config("spark.redis.port", 6379)
.config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries
.getOrCreate()
val personSeq = Seq(Person("Thomas", 11),Person("Peter", 22))
val df = spark.createDataFrame(personSeq)
df.write
.format("org.apache.spark.sql.redis")
.option("table", "person")
.mode(SaveMode.Overwrite) // 这里支持spark sink的多个模式
.save()
spark.stop()
}
}
结果:
?
?貌似并不支持update
从他存储结构也能看出来,支持update太困难...
schema
?
上述所有代码来自github项目 readme?
问题
执行报错注意看spark版本对应
scala版本对应
没有jedis包冲突
|