使用filebeat采集日志数据,通过kafka将数据传输给logstash进行过滤,最后输出到Elasticsearch绘制数据图表。
数据说明

cd /usr/local/filebeat/data
rm -r regsitry

cd /usr/local/filebeat
vim filebeat.yml
# 修改以下配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/hadoop/access_2018_05_01.log
output.kafka:
hosts: ["localhost:9092"]
topic: "applog"
processors:
- drop_fields:
fields: ["beat", "input", "source", "offset"]

cd /usr/local/kafka
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

cd /usr/local/kafka/config/
vim server.properties
# 按照如下内容修改配置文件
listeners=PLAINTEXT://localhost:9092


cd /usr/local/kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
jps

cd /usr/local/filebeat/
./filebeat -e -c filebeat.yml
#不要关闭此窗口,方便调试

cd /usr/local/logstash/
vim logstash-plain-map.conf
#配置文件内容如下
input{
kafka{
bootstrap_servers=> "localhost:9092"
topics => ["applog"]
group_id => "logstash-file"
codec => "json"
}
}
filter{
grok{
match => {
"message" => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})\" %{NUMBER:response}\ %{NUMBER:bytes}"
}
}
geoip {
source => "clientip"
database => "/home/hadoop/GeoLite2-City.mmdb"
}
}
output{
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-logs-2022"
}
stdout{
codec => rubydebug
}
}
cd /usr/local/elasticsearch-6.1.0/
./bin/elasticsearch
#此窗口不要关闭,方便调试
cd /usr/local/logstash
logstash -f ./logstash-plain-map.conf
kibana




如果不能显示结果,请点击右上角的时间过滤器,选择this week

保存各时间段访问流量图。点击“Save”,将名称设置为“各时间段访问流量”

创建访问流量前10名图新建Vertical Bar 图,
X轴的 Aggregation设为 Terms,
filed设为geoip.country_code3.keyword
size 10
X轴Aggregation 为TermsField:geoip.country_code3.keyword
size:10
X轴 Aggregation:geohash
Field: geoip.location
点击 Dashboard, 选择 “ Create a dashboard" 点击Add 单击显示的所有图表名称。



