大数据全系列 教程
1869个小节阅读:467.9k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Flink中提供Connector:RedisSink,将DataStream可以保存到Redis数据库中。
添加Maven依赖
xxxxxxxxxx
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
官方样例:
编码:
xxxxxxxxxx
package com.itbaizhan.flink.scala.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSinkDemo {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流计算)上下文执行环境
val streamEnv= StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
//读取数据
val stream = streamEnv.socketTextStream("node3",8888)
//转换计算
val result = stream.flatMap(_.split(","))
.map((_, 1))
.keyBy(_._1)
.sum(1)
//连接redis的配置
val config = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("node4").setPort(6379).build()
//写入redis
result.addSink(new RedisSink[(String, Int)](config,new RedisMapper[(String, Int)] {
override def getCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"t_wc") //t_wc是表名
override def getKeyFromData(data: (String, Int)) = {
data._1 //单词
}
override def getValueFromData(data: (String, Int)) = {
data._2+"" //单词出现的次数
}
}))
streamEnv.execute()
}
}
node3上执行
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
启动redis
xxxxxxxxxx
[root@node4 ~]# cd /usr/local/redis/
[root@node4 redis]# redis-server redis.conf
运行RedisSinkDemo
node3输出数据:
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
a b a d
查看redis中的数据