项目场景:

公司的正式项目生产环境中,每天都会产生海量的日志,系统日志使我们排查问题的重要依据。当我们的系统数据量足够大时,通过我们的日志还可以分析出非常多的有价值的信息,可以为我们的项目起到指导性作用。ELK架构就是专为收集、分析和存储日志设计。ELK(ElasticSearch,Logstash,Kibana)架构,包括日志的收集、转发、缓存、提取和过滤,以及最终的搜索和展示功能。


技术栈介绍:

数据流向说明:

  1. 系统采用log4j2进行日志记录,日志包括正常的系统日志app.log和错误error.log。代码存放地址:ELK海量日志系统
  2. filebeat进行日志的收集,filebeat将收集到的日志转存/转储到kafka中。
  3. logstash消费kafka中的日志数据,并且对日志做过滤。
  4. logstash将过滤出来的日志输出到ES中。
  5. kibana通过不同的图表,进行展示ES中保存的数据。

ELK海量日志系统具体实现:

  1. 将上述ELK海量日志系统从github上拉取下来,编译打包,将jar包上传到Linux服务器上,通过java -jar进行启动
  2. Linux上安装filebeat,filebeat下载地址:filebeat 下载 安装好之后,修改filebeat的配置文件,filebeat.yml,如下
###################### Filebeat Configuration Example #########################
filebeat.prospectors:

 1. input_type: log

  paths:
    ## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
    - /usr/local/logs/app-collector.log  ##系统产生的日志路径
  #定义写入 ES 时的 _type 值
  document_type: "app-log"
  multiline:
    #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 2000                             # 最大的行数
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: app-log-collector   ## 按服务划分用作kafka topic
    evn: dev

 2. input_type: log

  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 2000                             # 最大的行数
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: error-log-collector   ## 按服务划分用作kafka topic
    evn: dev
    
output.kafka:
  enabled: true
  hosts: ["192.168.11.51:9092"]  ##根据自己的实际情况进行设置
  topic: '%{[fields.logtopic]}'
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true
  1. 启动kafka,创建两个topic,分别为app-log-collector和error-log-collector。
  2. 启动filebeat,根据上述配置文件filebeat会把对应收集到的日志发送到kafka对应的topic中。
  3. 安装logstash,并新建script文件夹,拷贝下面的logstash_script.conf
## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {
   
  kafka {
   
    ## app-log-服务名称
    topics_pattern => "app-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
	codec => json
	consumer_threads => 1	## 增加consumer的并行消费线程数
	decorate_events => true
    #auto_offset_rest => "latest"
	group_id => "app-log-group"
   }

   kafka {
   
    ## error-log-服务名称
    topics_pattern => "error-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
	codec => json
	consumer_threads => 1
	decorate_events => true
    #auto_offset_rest => "latest"
	group_id => "error-log-group"
   }

}

filter {
   

  ## 时区转换
  ruby {
   
	code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  }

  if "app-log" in [fields][logtopic]{
   
    grok {
   
        ## 表达式
        match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    }
  }

  if "error-log" in [fields][logtopic]{
   
    grok {
   
        ## 表达式
        match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    }
  }

}

## 测试输出到控制台:
output {
   
  stdout {
    codec => rubydebug }
}


## elasticsearch:
output {
   

  if "app-log" in [fields][logtopic]{
   
	## es插件
	elasticsearch {
   
  	    # es服务地址
        hosts => ["192.168.11.35:9200"]  ##改成自己的,下同
        # 用户名密码 
        user => "elastic"  
        password => "123456"
        ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
        ## javalog-app-service-2021.01.23 
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        # 是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty
        # 通过嗅探机制进行es集群负载均衡发日志消息
        sniffing => true
        # logstash默认自带一个mapping模板,进行模板覆盖
        template_overwrite => true
    } 
  }

  if "error-log" in [fields][logtopic]{
   
	elasticsearch {
   
        hosts => ["192.168.11.35:9200"]    
        user => "elastic"
        password => "123456"
        index => "error-log-%{[fields][logbiz]}-%{index_time}"
        sniffing => true
        template_overwrite => true
    } 
  }


}
  1. logstash充当消费者,消费kafka数据,通过一系列的转换之后(文件中所示),最终将日志输出到ES上。
  2. ES上有一个Xpack-watch插件,有兴趣的可以了解下,下面给一个watch的例子
## 创建一个watcher,比如定义一个trigger 每个10s钟看一下input里的数据
PUT _xpack/watcher/watch/school_watcher
{
   
  "trigger": {
   
    "schedule": {
   
      "interval": "10s"
    }
  },
  ## 查看任务信息
  "input": {
   
    "search": {
   
      "request": {
   
        ## 监控具体索引
        "indices": ["school*"],
        ## body里面具体些搜索语句
        "body": {
   
          "size": 0,
          "query": {
   
            "match": {
   
              ## 比如索引里面name 有 hello 则进行报警
              "name": "hello"
            }
          }
        }
      }
    }
  },
  ## 对于上面的查询结果进行比较:
  "condition": {
   
    ## compare进行比较
    "compare": {
   
      ## 上面的query查询的结果会放入到ctx.payload中:
      ## 比如获取 ctx.payload.hits.total ctx.payload._shards.total 等等
      "ctx.payload.hits.total": {
   
        "gt": 0
      }
    }
  },
  ## transform作用:重新查询出文档内容赋值给ctx.payload
  "transform": {
   
    "search": {
   
      "request": {
   
        "indices": ["school*"],
        "body": {
   
          "size": 10,
          "query": {
   
            "match": {
   
              "name": "hello"
            }
          }
        }
      }
    }
  },
  ## 根据上面的查询、比较结果,执行actions里面定义的动作(定义多种报警类型)
  "actions": {
   
    ## 报警名字
    "log_hello": {
   
      ## 防止报警风暴: 设置阈值 15m内曾经报警过, 则不报警
      "throttle_period": "15m",
      ## 报警方式:logging、mail、http等
      "logging": {
   
        ## 报警具体内容:使用 {
   { 查询参数 }} 进行赋值:
        "text": "Found {
   {ctx.payload.hits.total}} hello in the school"
      }
    }
  }
}
  1. 由文档可见,上述的监视器表明,监视ES具体的索引,每10秒检查一次,并且执行actions里面的动作,进行预警。
  2. 安装kibana
  3. 在kibana进行展示ES保存的数据。也可以展示预警数据,具体页面如下: