大数据全系列 教程
1869个小节阅读:467.3k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
在Kafka Connector连接器中提供Source数据源和Sink接收器类,在Flink 1.12版本中提供基于新的接口消费Kafka数据:KafkaSource
。
Flink 1.12 版本中,提供基于新API接口Data Source
实现Kafka 数据源:KafkaSource
,消费数据更加简单。
xxxxxxxxxx
package com.itbaizhan.flink.scala.source
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object KafkaSourceDemo {
def main(args: Array[String]): Unit = {
//添加隐式转换
import org.apache.flink.streaming.api.scala._
//构建环境对象
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度 推荐设置主题的分区相同
env.setParallelism(3)
//构建KafkaSource对象
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
//设置kafka brokers
.setBootstrapServers("node2:9092,node3:9092,node4:9092")
//设置消费的主题
.setTopics("flink-topic1")
//设置消费者组
.setGroupId("flinkgroup1")
//设置起始消费偏移量
.setStartingOffsets(OffsetsInitializer.earliest())
//设置返序列化类
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
//设置kafka数据源
val dataDS : DataStream[String] = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "KafkaSource")
//输出
dataDS.print()
//触发执行
env.execute("KafkaSourceDemo")
}
}