import java.util
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import redis.clients.jedis.{Jedis, JedisPool}
import redis.clients.util.Pool
import scala.collection.mutable.ArrayBuffer
object GoodBookSpeakWellDateToRedis {
private[this] var jedisPool: Pool[Jedis] = _
def init(host: String, port: Int, timeout: Int, password: String): Unit = {
jedisPool = new JedisPool(new GenericObjectPoolConfig, host, port, timeout, password)
}
def zadd(key: String, story_14d_play_cts: Int, story_id: String): Unit = {
val jedis = jedisPool.getResource
jedis.zadd(key,story_14d_play_cts,story_id)
jedis.close()
}
def zrem(key: String,member :String): Unit ={
val jedis = jedisPool.getResource
jedis.zrem(key,member)
jedis.close()
}
def getResultToRedis(spark: SparkSession): Unit = {
val resultDataNew: DataFrame = spark.sql(
"""
|select story_id
| ,cast(story_14d_play_cts as int) as story_14d_play_cts
|from ads.ads_ks_log_story_play_hsjj_14h_sum_a_d
|order by story_14d_play_cts desc
|""".stripMargin)
resultDataNew.show(20)
val mapRDD: RDD[(String, Int)] = resultDataNew.rdd.map(row => (row.getString(0), row.getInt(1)))
val nowDate: Array[(String,Int)] = mapRDD.collect()
val jedis = jedisPool.getResource
import scala.collection.JavaConversions._
val beforeDate: util.Set[String] = jedis.zrevrange("EXPLAIN_BOOK_ZS", 0, -1)
jedis.close()
val sameDate = new ArrayBuffer[String]
val nowNoSameDate = new ArrayBuffer[(String, Int)]
val beforeNoSameDate: ArrayBuffer[String] = new ArrayBuffer[String]
for(elem <- nowDate){
if (beforeDate.contains(elem._1)){
sameDate += elem._1
zadd("EXPLAIN_BOOK_ZS",elem._2,elem._1)
} else{
nowNoSameDate += elem
}
}
if(sameDate.length >0 & sameDate.length < 10) {
for (elem <- beforeDate) {
if (!sameDate.contains(elem)) {
beforeNoSameDate += elem
}
}
}else if(sameDate.length == 0){
for(elem <- beforeDate){
beforeNoSameDate += elem
}
}
var i:Int = 0
if(sameDate.length!=10){
while (i<=nowNoSameDate.length-1){
zrem("EXPLAIN_BOOK_ZS",beforeNoSameDate(i))
zadd("EXPLAIN_BOOK_ZS",nowNoSameDate(i)._2,nowNoSameDate(i)._1)
i+=1
}
}
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("GoodBookSpeakWellDateToRedis")
.config("spark.driver.allowMultipleContexts", true)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val password = ""
val host = ""
val port = 6379
val timeout = 1000
init(host, port, timeout, password)
getResultToRedis(spark)
spark.stop()
}
}
|