我已经配置了logstash.conf以动态插入数据库的数据,但是问题是:
当我更改表的一行时,该行不会在索引中更新,因为我仅在sql_last_value之后插入新值,尽管我是关于触发器的,但我不确定该怎么做.
input {
jdbc {
jdbc_connection_string =>"jdbc:MysqL://localhost:3306/blog"
jdbc_user =>"root"
jdbc_password =>""
jdbc_driver_library =>"C:\Users\saidb\Downloads\mysql-connector-java-5.1.47\mysql-connector-java-5.1.47.jar"
jdbc_driver_class =>"com.MysqL.jdbc.Driver"
schedule =>"* * * * *"
statement =>"SELECT * FROM blog_pro WHERE id >:sql_last_value"
use_column_value =>true
tracking_column =>id
}
}
output {
elasticsearch {
hosts =>"localhost:9200"
index =>"blog_pro"
document_type =>"data"
}
}
解决方法:
如果使用id选择行,则不能这样做.
您有2种选择,
>每次选择所有行,然后使用查询SELECT * FROM blog_pro将它们发送到ES,根据您的情况,我认为这不是一个好选择.
>创建一个新列last_modified_time,其中包含记录(行)的最后修改时间戳.然后使用它来过滤行.注意属性tracking_column_type => “时间戳”
语句=>“ SELECT * FROM blog_pro WHERE last_modiefied_time>:sql_last_value”
use_column_value => true
tracking_column => last_modified_time
tracking_column_type => “时间戳”
这是完整的logstash配置
input {
jdbc {
jdbc_connection_string =>"jdbc:MysqL://192.168.3.57:3306/blog_pro"
jdbc_user =>"dush"
jdbc_password =>"dush"
jdbc_driver_library =>"F:\logstash-6.2.2\bin\mysql-connector-java-5.1.6.jar"
jdbc_driver_class =>"com.MysqL.jdbc.Driver"
schedule =>"* * * * *"
statement =>"SELECT * FROM blog_pro WHERE last_modified_time >:sql_last_value"
use_column_value =>true
tracking_column =>last_modified_time
tracking_column_type => "timestamp"
}
}
output
{
#output to elasticsearch
elasticsearch {
hosts => [ "192.168.1.245:9201" ]
action=>update
# "%{id}" - > primary key of the table
document_id => "%{id}"
doc_as_upsert =>true
}
}
请注意,您可能需要清除索引并使用此配置开始索引.我对此进行了测试,效果很好.
Elasticsearch版本= 5.x.x
logstash版本= 6.2.2
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。