大数据全系列 教程
1869个小节阅读:464.7k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
使用Azkaban来配置任务流进行任务调度。
node1 | node2 | node3 | node4 |
---|---|---|---|
ZK | ZK | ZK | |
NameNode | NameNode | RM | RM |
DataNode/NM | DataNode/NM | DataNode/NM | |
MySQL | Hive | Hive_Client | |
Flume | Sqoop | ||
Spark | Spark | Spark | |
Azkaban | Azkaban | Azkaban | Superset |
集群中提交任务,需要修改项目中的application.conf文件配置项:local.run="false"
。
并将项目打包MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar
,并上传到node3。
xxxxxxxxxx
[root@node3 ~]# mkdir -p music/
[root@node3 ~]# cd music
[root@node3 music]# ls
MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar
确保在Hive中创建各个ODS层表及EDS层表
编写执行第一个job的脚本
xxxxxxxxxx
[root@node1 ~]# mkdir music/req1 music/logs
[root@node1 ~]# cd music/req1/
[root@node1 req1]# vim 1produce_clientlog.sh
内容如下:
xxxxxxxxxx
currentDate = `date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "========使用自动生成的今天的日期======="
else
echo "========使用azkaban传入的日期======="
currentDate=$1
fi
echo "被分析日志的日期为${currentDate}"
ssh root@node3 > /root/music/logs/req1.log 2>&1 <<aabbcc
cd /opt/spark-3.2.1/bin/
sh spark-submit --master yarn --class com.itbaizhan.scala.musicproject.ods.ProduceClientLog /root/music/MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "req1 job1 all done!"
准备抽取mysql数据的sqoop脚本(node3上安装了sqoop)
xxxxxxxxxx
[root@node3 ~]# mkdir music/sqoop/
[root@node3 ~]# cd music/sqoop/
[root@node3 sqoop]# vim ods_mysqltohive_to_song_info_d.sh
内容如下:
xxxxxxxxxx
sqoop import \
--connect jdbc:mysql://node1:3306/songdb?dontTrackOpenResources=true\&defaultFetchSize=10000\&useCursorFetch=true\&useUnicode=yes\&characterEncoding=utf8 \
--username root \
--password 123456 \
--table song \
--target-dir /user/hive_remote/warehouse/data/song/TO_SONG_INFO_D/ \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t'
mysql数据抽取数据到Hive ODS脚本2extract_mysqldata_to_ods.sh
xxxxxxxxxx
[root@node1 req1]# vim 2extract_mysqldata_to_ods.sh
内容如下:
xxxxxxxxxx
ssh root@node3 > /root/music/logs/req1.log 2>&1 <<aabbcc
cd /root/music/sqoop
sh ods_mysqltohive_to_song_info_d.sh
exit
aabbcc
echo "req1 job2 all down!"
清洗歌库歌曲表脚本
xxxxxxxxxx
[root@node1 req1]# vim 3produce_tw_song_baseinfo_d.sh
内容如下:
xxxxxxxxxx
ssh root@node3 > /root/music/logs/req1.log 2>&1 <<aabbcc
cd /opt/spark-3.2.1/bin/
sh spark-submit --master yarn --class com.itbaizhan.scala.musicproject.eds.content.GenerateTwSongBaseinfoD /root/music/MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar
exit
aabbcc
echo "req1 job3 all down!"
生成歌曲特征日统计表脚本 4produce_tw_song_ftur_d.sh
xxxxxxxxxx
[root@node1 req1]# vim 4produce_tw_song_ftur_d.sh
内容如下:
xxxxxxxxxx
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "===使用自动生成的今天的日期==="
else
echo "===使用azkaban传入的日期==="
currentDate=$1
fi
echo "日期为$currentDate"
ssh root@node3 > /root/music/logs/req1.log 2>&1 <<aabbcc
cd /opt/spark-3.2.1/bin/
sh spark-submit --master yarn --class com.itbaizhan.scala.musicproject.eds.content.GenerateTwSongFturD /root/music/MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "req1 job4 all down!"
生成歌曲热度表脚本 5produce_tw_song_rsi_d.sh
xxxxxxxxxx
[root@node1 req1]# vim 5produce_tw_song_rsi_d.sh
内容如下:
xxxxxxxxxx
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "===使用自动生成的今天的日期==="
else
echo "===使用azkaban传入的日期==="
currentDate=$1
fi
echo "日期为$currentDate"
ssh root@node3 > /root/music/logs/req1.log 2>&1 <<aabbcc
cd /opt/spark-3.2.1/bin/
sh spark-submit --master yarn --class com.itbaizhan.scala.musicproject.dm.content.GenerateTmSongRsiD /root/music/MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "req1 job5 all down!"
生成歌手热度表脚本 6produce_tw_singer_rsi_d.sh
xxxxxxxxxx
[root@node1 req1]# vim 6produce_tw_singer_rsi_d.sh
内容如下:
xxxxxxxxxx
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "===使用自动生成的今天的日期==="
else
echo "===使用azkaban传入的日期==="
currentDate=$1
fi
echo "日期为$currentDate"
ssh root@node3 > /root/music/logs/req1.log 2>&1 <<aabbcc
cd /opt/spark-3.2.1/bin/
sh spark-submit --master yarn --class com.itbaizhan.scala.musicproject.dm.content.GenerateTmSingerRsiD /root/music/MusicProject-1.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "req1 job6 all down!"
启动Azkaban
启动exec server
xxxxxxxxxx
[root@node1 azkaban-exec]# bin/start-exec.sh
[root@node2 azkaban-exec]# bin/start-exec.sh
[root@node3 azkaban-exec]# bin/start-exec.sh
扩展:关闭命令bin/shutdown-exec.sh
激活Executor
http://node3:12345/executor?action=activate
xxxxxxxxxx
{"status":"success"} #表示登录成功
【温馨提示】重启Azkaban Executor Server得重新激活
启动 web server
xxxxxxxxxx
[root@node2 azkaban-web]# bin/start-web.sh
测试访问http://node2:8081/,并用azkaban用户登录
编写azkaban 各个job组成任务流:
创建req1文件夹
在文件中创建azkaban.project
xxxxxxxxxx
azkaban-flow-version2.0
在文件中创建basic.flow
xxxxxxxxxx
nodes
name job1
type command
config
command sh /root/music/req1/1produce_clientlog.sh $ mydate
name job2
type command
config
command sh /root/music/req1/2extract_mysqldata_to_ods.sh
name job3
type command
dependsOn
job2
config
command sh /root/music/req1/3prodcue_tw_song_baseinfo_d.sh
name job4
type command
dependsOn
job1
job3
config
command sh /root/music/req1/4produce_tw_song_ftur_d.sh $ mydate
name job5
type command
dependsOn
job4
config
command sh /root/music/req1/5produce_tw_song_rsi_d.sh $ mydate
name job6
type command
dependsOn
job4
config
command sh /root/music/req1/6produce_tw_singer_rsi_d.sh $ mydate
压缩两个文件为req1.zip
azkaban创建项目,将req1.zip上传到项目,并配置参数
继续执行
检查相关表中的数据。