微服务 教程
1085个小节阅读:196.6k
C语言快速入门
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
准备数据库
Logstash不支持删除同步。如果想实现删除同步,可以在设计数据库表时设置数据软删除,即添加一个字段表示该数据是否删除。删除时不进行物理删除,而是修改该字段的值。
Logstash实现增量导入需要有一个定位字段,通过该字段判断这个数据是否更新过。 案例中使用updateTime(修改时间)作为定位字段,logstash读取数据时会记录所有数据中最大的updateTime。下次读取数据会和上次最大的updateTime对比,如果大于上次最大的updateTime,证明该数据更新过,需要更新到ES中。
在Elasticsearch中创建索引
xxxxxxxxxx
PUT /product
{
"mappings": {
"properties": {
"id": {
"type": "integer",
"store": true,
"index": true
},
"productName": {
"type": "text",
"store": true,
"index": true
},
"price": {
"type": "double",
"store": true,
"index": true
},
"updatetime": {
"type": "date",
"store": true,
"index": true
},
"isDelete": {
"type": "integer",
"store": true,
"index": true
}
}
}
}
配置Logstash
xxxxxxxxxx
cd /usr/local/logstash-7.12.1/config/
vim mysql_product.conf
# 设置为以下内容
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/shopping"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.12.1/lib/mysql-connector-java-5.1.37-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 时区
jdbc_default_timezone => "Asia/Shanghai"
# SQL语句
statement => "select * from product where updatetime >= :sql_last_value;"
# 执行SQL的周期, [秒] 分钟 小时 天 月 年
schedule => "* * * * *"
# 是否使用字段的值作为比较策略
use_column_value => true
# 比较的字段名称
tracking_column => "updatetime"
# 比较的字段类型,numberic为数字,timestamp为日期
tracking_column_type => "timestamp"
# 记录比较字段值的文件,相对寻址路径是logstash的安装路径
last_run_metadata_path => "./product-last-value"
}
}
filter {
# 解决采集数据的时差问题
ruby {
code => "event.set('updatetime', event.get('updatetime').time.localtime + 8*60*60)"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200","127.0.0.1:9201"]
index => "product"
document_id => "%{id}"
}
}
# 启动Logstash
cd /usr/local/logstash-7.12.1/bin/
./logstash -f ../config/mysql_product.conf