大数据全系列 教程
1869个小节阅读:468.1k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:采用Netcat 数据源发送数据,基于Flink计算引擎,使用流计算(Streaming )实现词频统计WordCount。
1.构建执行环境-env 2.读取本地文件返回数据源-dataSource 3.数据转换-transformation 4.数据接收器-sink 5.触发执行-execute
在src/main/java下创建Java类com.itbaizhan.flink.java.base.WordCountStream
xxxxxxxxxx
package com.itbaizhan.flink.java.base;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStream {
public static void main(String[] args) throws Exception {
//1.构建执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//2.读取本地文件返回数据源-dataSource
DataStreamSource<String> dataSource = env.socketTextStream("node3", 8888);
//3.数据转换-transformation
//3.1分割单词
SingleOutputStreamOperator<String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
//按照空格进行拆分
String[] words = line.trim().split("\\s+");
//遍历数组,注意添加到collector中
for (String word : words) {
collector.collect(word);
}
}
});
//3.2 转化为二元组 hello->(hello,1)
SingleOutputStreamOperator<Tuple2<String, Integer>> tupStream = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word,1);
}
});
//3.3 将相同的key分为一组
KeyedStream<Tuple2<String, Integer>, String> keyedStream = tupStream.keyBy(tup -> tup.f0);
//3.4 求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
//4.数据接收器-sink
sum.print();
//5.触发执行-execute 流处理一定需要触发执行
env.execute("WordCountStream");
}
}
测试步骤:
node3上执行命令
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
运行程序WordCountStream
node3上输入内容
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
a b c
a d d
w h k
a b c
a b a
a d w
看IDEA控制台