大数据全系列 教程
1869个小节阅读:467.5k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
xxxxxxxxxx
package com.itbaizhan.scala.musicproject.streaming
import com.alibaba.fastjson.JSON
import com.itbaizhan.scala.musicproject.base.RedisClient
import com.itbaizhan.scala.musicproject.common.ConfigUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable
//此类是实时获取用户点播歌曲日志,每隔10秒,获取最近10分钟歌曲的点播热度,并将结果存入MySQL中。
case class HotSongInfo(songName:String,times:Int)
object RealTimeHotSong {
private var sparkSession : SparkSession = _
private var sc: SparkContext = _
def main(args: Array[String]): Unit = {
if(ConfigUtils.LOCAL_RUN){
sparkSession = SparkSession.builder()
.master("local")
.config("spark.sql.shuffle.partitions",2)
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.appName("RealTimeHotSongInfo").getOrCreate()
sc = sparkSession.sparkContext
}else{
sparkSession = SparkSession.builder().appName("RealTimeHotSongInfo")
.config("spark.sql.shuffle.partitions",2)
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
sc = sparkSession.sparkContext
}
sparkSession.sparkContext.setLogLevel("Error")
val ssc = new StreamingContext(sc,Durations.seconds(10))
//从Redis 中获取消费者offset
val currentTopicOffset: mutable.Map[String, String] = RedisClient.getOffSetFromRedis(
ConfigUtils.REDIS_OFFSET_DB,ConfigUtils.USER_PLAY_SONG_TOPIC)
//初始读取到的topic offset:
currentTopicOffset.foreach(tp=>{println(s" 初始读取到的offset: $tp")})
//转换成需要的类型
val fromOffsets: Predef.Map[TopicPartition, Long] = currentTopicOffset.map { resultSet =>
new TopicPartition(ConfigUtils.USER_PLAY_SONG_TOPIC, resultSet._1.toInt) -> resultSet._2.toLong
}.toMap
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> ConfigUtils.KAFKA_BROKERS,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "MusicGp1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
)
//将获取到的消费者offset 传递给SparkStreaming
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
//统计实时热门歌曲
stream.map(cr=>{
val jsonObject = JSON.parseObject(cr.value())
//val songId = jsonObject.getString("songid")
val songName = jsonObject.getString("songname")
(songName,1)
}).reduceByKeyAndWindow((v1:Int,v2:Int)=>{v1+v2},
Durations.minutes(10),
Durations.seconds(10))
.foreachRDD(rdd=>{
println("current is start ... ...")
//将top30热度最高的歌曲,结果保存到MySQL 表 hotsong中
val hotSongInfo: RDD[HotSongInfo] = rdd.map(tp => {
val songName = tp._1
val playTimes = tp._2
HotSongInfo(songName, playTimes)
})
val session = sparkSession.newSession()
import session.implicits._
hotSongInfo.toDF().createTempView("temp_song_info")
session.sql(
"""
|select
| songname,times,row_number() over (partition by 1 order by times desc ) as rank
|from temp_song_info
""".stripMargin)
.filter("rank <=30")
.write.format("jdbc")
.mode(SaveMode.Overwrite)
.option("url",ConfigUtils.MYSQL_URL)
.option("user",ConfigUtils.MYSQL_USER)
.option("password",ConfigUtils.MYSQL_PASSWORD)
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","hotsong")
.save()
println("current is finished ... ...")
})
//手动维护消费者offset
stream.foreachRDD { (rdd:RDD[ConsumerRecord[String, String]]) =>
println("所有业务完成")
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//将当前批次最后的所有分区offsets 保存到 Redis中
RedisClient.saveOffsetToRedis(ConfigUtils.REDIS_OFFSET_DB,offsetRanges)
}
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}