项目场景:
公司的正式项目生产环境中,每天都会产生海量的日志,系统日志使我们排查问题的重要依据。当我们的系统数据量足够大时,通过我们的日志还可以分析出非常多的有价值的信息,可以为我们的项目起到指导性作用。ELK架构就是专为收集、分析和存储日志设计。ELK(ElasticSearch,Logstash,Kibana)架构,包括日志的收集、转发、缓存、提取和过滤,以及最终的搜索和展示功能。
技术栈介绍:
数据流向说明:
- 系统采用log4j2进行日志记录,日志包括正常的系统日志app.log和错误error.log。代码存放地址:ELK海量日志系统
- filebeat进行日志的收集,filebeat将收集到的日志转存/转储到kafka中。
- logstash消费kafka中的日志数据,并且对日志做过滤。
- logstash将过滤出来的日志输出到ES中。
- kibana通过不同的图表,进行展示ES中保存的数据。
ELK海量日志系统具体实现:
- 将上述ELK海量日志系统从github上拉取下来,编译打包,将jar包上传到Linux服务器上,通过java -jar进行启动
- Linux上安装filebeat,filebeat下载地址:filebeat 下载 安装好之后,修改filebeat的配置文件,filebeat.yml,如下
###################### Filebeat Configuration Example #########################
filebeat.prospectors:
1. input_type: log
paths:
## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
- /usr/local/logs/app-collector.log ##系统产生的日志路径
#定义写入 ES 时的 _type 值
document_type: "app-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用作kafka topic
evn: dev
2. input_type: log
paths:
- /usr/local/logs/error-collector.log
document_type: "error-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用作kafka topic
evn: dev
output.kafka:
enabled: true
hosts: ["192.168.11.51:9092"] ##根据自己的实际情况进行设置
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
- 启动kafka,创建两个topic,分别为app-log-collector和error-log-collector。
- 启动filebeat,根据上述配置文件filebeat会把对应收集到的日志发送到kafka对应的topic中。
- 安装logstash,并新建script文件夹,拷贝下面的logstash_script.conf
## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1 ## 增加consumer的并行消费线程数
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-group"
}
kafka {
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-group"
}
}
filter {
## 时区转换
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
## 测试输出到控制台:
output {
stdout {
codec => rubydebug }
}
## elasticsearch:
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["192.168.11.35:9200"] ##改成自己的,下同
# 用户名密码
user => "elastic"
password => "123456"
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2021.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}
- logstash充当消费者,消费kafka数据,通过一系列的转换之后(文件中所示),最终将日志输出到ES上。
- ES上有一个Xpack-watch插件,有兴趣的可以了解下,下面给一个watch的例子
## 创建一个watcher,比如定义一个trigger 每个10s钟看一下input里的数据
PUT _xpack/watcher/watch/school_watcher
{
"trigger": {
"schedule": {
"interval": "10s"
}
},
## 查看任务信息
"input": {
"search": {
"request": {
## 监控具体索引
"indices": ["school*"],
## body里面具体些搜索语句
"body": {
"size": 0,
"query": {
"match": {
## 比如索引里面name 有 hello 则进行报警
"name": "hello"
}
}
}
}
}
},
## 对于上面的查询结果进行比较:
"condition": {
## compare进行比较
"compare": {
## 上面的query查询的结果会放入到ctx.payload中:
## 比如获取 ctx.payload.hits.total ctx.payload._shards.total 等等
"ctx.payload.hits.total": {
"gt": 0
}
}
},
## transform作用:重新查询出文档内容赋值给ctx.payload
"transform": {
"search": {
"request": {
"indices": ["school*"],
"body": {
"size": 10,
"query": {
"match": {
"name": "hello"
}
}
}
}
}
},
## 根据上面的查询、比较结果,执行actions里面定义的动作(定义多种报警类型)
"actions": {
## 报警名字
"log_hello": {
## 防止报警风暴: 设置阈值 15m内曾经报警过, 则不报警
"throttle_period": "15m",
## 报警方式:logging、mail、http等
"logging": {
## 报警具体内容:使用 {
{ 查询参数 }} 进行赋值:
"text": "Found {
{ctx.payload.hits.total}} hello in the school"
}
}
}
}
- 由文档可见,上述的监视器表明,监视ES具体的索引,每10秒检查一次,并且执行actions里面的动作,进行预警。
- 安装kibana
- 在kibana进行展示ES保存的数据。也可以展示预警数据,具体页面如下: