大数据全系列 教程
1869个小节阅读:465.3k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
The Kafka connector allows for reading data from and ==writing data== into Kafka topics
创建主题station-topic-sink
xxxxxxxxxx
[root@node2 ~]# kafka-topics.sh --bootstrap-server node3:9092 --create --partitions 3 --replication-factor 2 --topic station-sink-topic
Created topic station-sink-topic.
[root@node2 ~]# kafka-topics.sh --bootstrap-server node3:9092 --list
...
station-sink-topic
...
编写SQL DDL建表语句:
xxxxxxxxxx
CREATE TABLE tb_station_sink (
`sid` STRING,
`call_type` STRING,
`call_time` BIGINT,
`duration` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'station-sink-topic',
'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9092',
'sink.delivery-guarantee' = 'at-least-once',
'sink.parallelism' = '3',
'format' = 'json'
)
保存数据为JSON字符串时,需要添加Maven 依赖:
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14.6</version>
</dependency>
编码实现:
xxxxxxxxxx
package com.itbaizhan.flink.scala.tableapi_sql
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object KafkaSinkDemo {
def main(args: Array[String]): Unit = {
//构建环境配置对象,并指定流处理
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
//创建Table执行环境对象
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
//使用SQL DDL创建Source表
tableEnv.executeSql("CREATE TABLE tb_station (\n" +
" `sid` STRING,\n" +
" `call_type` STRING,\n" +
" `call_time` BIGINT,\n" +
" `duration` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'station-topic',\n" +
" 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9092',\n" +
" 'properties.group.id' = 'gid2',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")"
)
//使用SQL DDL 创建Kafka Sink表
tableEnv.executeSql("CREATE TABLE tb_station_sink (\n" +
" `sid` STRING,\n" +
" `call_type` STRING,\n" +
" `call_time` BIGINT,\n" +
" `duration` BIGINT\n" +
") WITH(\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'station-sink-topic',\n" +
" 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9092',\n" +
" 'sink.delivery-guarantee' = 'at-least-once',\n" +
" 'sink.parallelism' = '3',\n" +
" 'format' = 'json'\n" +
")"
)
//执行从Source表中查询数据并通过插入Sink表实现写数据到Kafka的station-sink-topic主题中
tableEnv.executeSql(
"insert into tb_station_sink" +
" select sid,call_type,call_time,duration" +
" from tb_station" +
" where call_type='success'")
}
}
测试:
开启消费者:
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --topic station-sink-topic
运行MakeStationsData类
运行KafkaSinkDemo类
查看步骤1的消费者信息