大数据全系列 教程
1869个小节阅读:467.5k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
从Kafka 消费数据时,可以设置从Kafka Topic中哪个偏移量位置开始消费数据,默认情况下,第一次运行,从Kafka Topic==最大偏移量==开始消费数据。
earliest
:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉
了,所以可以理解为从现存的数据里最小位置开始消费。per-partition assignment
:对每个分区都指定一个offset,再从offset位置开始消费。latest
:从最末位置开始消费。相关方法:
setStartFromGroupOffsets
:
__consumer_offsets
;setStartFromEarliest
:从最小偏移量消费数据
setStartFromLatest
:从最大偏移量消费数据
setStartFromTimestamp
:消费每条数据时间戳大于指定时间戳
setStartFromSpecificOffsets
:从指定偏移量开始消费数据,偏移量值大于设置偏移量
代码演示:
xxxxxxxxxx
package com.itbaizhan.flink.scala.source
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import java.lang
import java.util.Properties
object FlinkKafkaConsumerOffsetDemo {
def main(args: Array[String]): Unit = {
//构建环境对象
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度 推荐设置主题的分区相同
env.setParallelism(3)
//配置kafka链接的参数
val prop = new Properties()
prop.setProperty("bootstrap.servers","node2:9092,node3:9092,node4:9092")
prop.setProperty("group.id","flinkgroup1")
//构建FlinkKafkaConsumer对象
val kafkaConsumer = new FlinkKafkaConsumer[String]("flink-topic1", new SimpleStringSchema(), prop)
//TODO 1.Flink从topic中现存的数据里最小位置开始消费
//kafkaConsumer.setStartFromEarliest()
//TODO 2.Flink从topic中最新的数据开始消费
//kafkaConsumer.setStartFromLatest()
//TODO 3.Flink从topic中指定group上次消费的位置开始消费,必须配置group.id参数
//kafkaConsumer.setStartFromGroupOffsets()
//TODO 4.Flink从topic中指定时间戳
//kafkaConsumer.setStartFromTimestamp(1671609649341L)
//TODO 5.Flink从topic的分区指定具体的偏移量
val offsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
offsets.put(new KafkaTopicPartition("flink-topic1",0),2L)
offsets.put(new KafkaTopicPartition("flink-topic1",1),3L)
offsets.put(new KafkaTopicPartition("flink-topic1",2),2L)
//勿忘我
kafkaConsumer.setStartFromSpecificOffsets(offsets)
import org.apache.flink.streaming.api.scala._
//添加kafka源
val dataDS = env.addSource(kafkaConsumer)
//输出
dataDS.print()
//触发执行
env.execute("FlinkKafkaConsumerOffsetDemo")
}
}
注意:上面所设置消费偏移量位置,表示不考虑流式程序从Checkpoint检查点或保存点SavePoint恢复。
Offset Explorer 2扩展:
下载网址:https://www.kafkatool.com/download.html
配置网址:https://blog.csdn.net/m0_67401660/article/details/126061426