大数据全系列 教程
1869个小节阅读:464.8k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
导入依赖:
xxxxxxxxxx
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.7</version>
</dependency>
代码编写:
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectAPIDemo {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("DirectAPIDemo")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.定义Kafka参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node2:9092,node3:9092,node4:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "itbaizhan",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
//4.读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,
//由框架自动选择位置匹配
LocationStrategies.PreferConsistent,
//消费者策略 主题:topicKafka,kafka参数:kafkaPara
ConsumerStrategies.Subscribe[String, String](Set("topicKafka"), kafkaPara))
//5.将每条消息的KV取出
//val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
val valueDStream: DStream[String] = kafkaDStream.map(_.value())
//6.计算WordCount
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//7.开启任务
ssc.start()
ssc.awaitTermination()
}
}