大数据全系列 教程
1869个小节阅读:467.8k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Flink提供Connector连接器中支持Kafka的Source数据源和数据终端Sink。Flink Kafka Sink核心类:FlinkKafkaProducer
。
官方Scala样例:
代码:
xxxxxxxxxx
package com.itbaizhan.flink.scala.sink
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import java.lang
import java.util.Properties
object FlinkKafkaProducerDemo {
def main(args: Array[String]): Unit = {
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//1.构建执行环境-env
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局的并行度
env.setParallelism(1)
//2.读取本地文件返回数据源-dataSource
val stream: DataStream[String] = env.socketTextStream("node3", 8888)
//3.数据转换-transformation
val result: DataStream[(String, Int)] = stream.flatMap(_.split("\\s+"))
.map((_, 1)) //每个单词计数为1
//.keyBy(0)//过时的方法
.keyBy(_._1) //按照key进行分组
.sum(1) //按照value进行求和
//Kafka生产者的配置
val prop = new Properties()
prop.setProperty("bootstrap.servers","node2:9092,node3:9092,node4:9092")
//数据写入kafka,并且是KeyValue格式的数据
prop.setProperty("key.serializer",classOf[ByteArraySerializer].getName)
prop.setProperty("value.serializer",classOf[ByteArraySerializer].getName)
//构建序列化对象
val schema = new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("topic_sink",element._1.getBytes,(element._2+"").getBytes())
}
}
//使用准备好的参数构建FlinkKafkaProducer的对象
val producer = new FlinkKafkaProducer[(String, Int)]("topic_sink",
schema, prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)//EXACTLY_ONCE精准一次
result.addSink(producer)
//5.触发执行-execute
env.execute("WordCountStream")
}
}