大数据全系列 教程
1869个小节阅读:468k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:采用Netcat 数据源发送数据,基于Flink计算引擎,使用流计算(Streaming )实现词频统计WordCount。
1.构建执行环境-env 2.读取本地文件返回数据源-dataSource 3.数据转换-transformation 4.数据接收器-sink 5.触发执行-execute
注意: Flink流式处理数据时,需要导入隐式转换:org.apache.flink.streaming.api.scala._
在src/main/scala下创建Scala类com.itbaizhan.flink.scala.base.WordCountStream
编码:
xxxxxxxxxx
package com.itbaizhan.flink.scala.base
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object WordCountStream {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming流计算的上下文对象
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局并行度
env.setParallelism(1)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取node3通过socket传输过来的数据
val stream: DataStream[String] = env.socketTextStream("node3",8888)
//转换计算
val result: DataStream[(String, Int)] = stream.flatMap(_.split("\\s+"))
.map((_, 1)) //每个单词计数为1
//.keyBy(0) //按照第一列分组,过时了
.keyBy(_._1) //按照第一列分组
.sum(1)//按照第二列统计
//打印结果到控制台
result.print()
//启动流式处理,如果没有改行代码上面的程序不会运行
env.execute("wordcount")
}
}
测试步骤:
node3上执行命令
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
运行程序WordCountStream
node3上输入内容
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
a b c
a d d
w h k
a b c
a b a
a d w
看IDEA控制台
xxxxxxxxxx
(a,1)
(b,1)
(c,1)
(a,2)
(d,1)
(d,2)
(w,1)
(h,1)
(k,1)
(a,3)
(b,2)
(c,2)
(a,4)
(b,3)
(a,5)
(a,6)
(d,3)
(w,2)