大数据全系列 教程
1869个小节阅读:467.9k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
启动Zookeeper集群
xxxxxxxxxx
[root@node2 ~]# zkServer.sh start
[root@node3 ~]# zkServer.sh start
[root@node4 ~]# zkServer.sh start
启动kafka集群
xxxxxxxxxx
[root@node2 ~]# kafka.sh start
--------start node2's kafka----------
--------start node3's kafka----------
--------start node4's kafka----------
创建主题flink-topic1
xxxxxxxxxx
[root@node2 ~]# kafka-topics.sh --bootstrap-server node3:9092 --replication-factor 1 --partitions 3 --create --topic flink-topic1
查看所有主题
xxxxxxxxxx
[root@node2 ~]# kafka-topics.sh --list --bootstrap-server node3:9092
flink-topic1
......
启动kafka生产者
xxxxxxxxxx
[root@node2 ~]# kafka-console-producer.sh --topic flink-topic1 --broker-list node3:9092,node4:9092
>
修改程序FlinkKafkaConsumer,然后运行
xxxxxxxxxx
dataDS.print()
/*dataDS.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()*/
在kafka生产者添加数据
xxxxxxxxxx
[root@node2 ~]# kafka-console-producer.sh --topic flink-topic1 --broker-list node3:9092,node4:9092
>a
>b
>c
>d
>e
>f
>g
>
观察IDEA控制台
xxxxxxxxxx
1> a
1> b
3> c
3> d
2> e
2> f
1> g
停止运行FlinkKafkaConsumer,做如下修改,再次运行
xxxxxxxxxx
//dataDS.print()
dataDS.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
在kafka生产者添加数据
xxxxxxxxxx
>hello java hello python hello java
>hello java hello flink
>
观察IDEA控制台
xxxxxxxxxx
1> (hello,1)
1> (java,1)
1> (hello,2)
1> (python,1)
1> (hello,3)
1> (java,2)
3> (flink,1)
1> (hello,4)
1> (java,3)
1> (hello,5)