wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.tar.gz
解压并修改config/elasticsearch.yml
下面的data目录要先创建.注意,不能用root账户启动es
尝试启动可能报错原因是若干文件没有权限.chmod 777给他们加上权限

cluster.name: my-application
node.name: node-1
path.data: /home/tangsong.math/repos/elasticsearch-6.2.4/data
path.logs: /home/tangsong.math/repos/elasticsearch-6.2.4/logs
network.host: 10.227.26.77
http.port: 9200

浏览器访问http://10.227.26.77:9200/

{
"name" : "node-1",
"cluster_name" : "my-application",
"cluster_uuid" : "7rKVCgfdS1Wco0jOtfp2Bg",
"version" : {
"number" : "6.2.4",
"build_hash" : "ccec39f",
"build_date" : "2018-04-12T20:37:28.497551Z",
"build_snapshot" : false,
"lucene_version" : "7.2.1",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}

我们还需要安装几个插件.插件大多是基于node.js的所以要先安装node
wget https://nodejs.org/dist/v14.17.0/node-v14.17.0-linux-x64.tar.xz
xz -d node-v14.17.0-linux-x64.tar.xz
tar -xvf node-v14.17.0-linux-x64.tar

将NODE_HOME加入环境变量
vi /etc/profile
export NODE_HOME=/home/tangsong.math/repos/node-v14.17.0-linux-x64
export PATH=NODE_HOME/bin
source /etc/profile

下载kibana.版本与es一致
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-linux-x86_64.tar.gz
修改config下的kibana.yml文件

server.port: 5601
server.host: "10.227.26.77"
elasticsearch.url: "http://10.227.26.77:9200"

./bin/kibana 启动
打开http://10.227.26.77:5601即可

索引(indices)--------------------------------Databases 数据库

​ 类型(type)-----------------------------Table 数据表

​ 文档(Document)----------------Row 行

​ 字段(Field)-------------------Columns 列

详细说明:

概念

说明

索引库(indices)

indices是index的复数,代表许多的索引,

类型(type)

类型是模拟mysql中的table概念,一个索引库下可以有不同类型的索引,比如商品索引,订单索引,其数据格式不同。不过这会导致索引库混乱,因此未来版本中会移除这个概念

文档(document)

存入索引库原始的数据。比如每一条商品信息,就是一个文档

字段(field)

文档中的属性

映射配置(mappings)

字段的数据类型、属性、是否索引、是否存储等特性
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html

创建索引
PUT twitter
{
"settings" : {
"index" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}
}
这样创建的索引名为twiter,分片数为3,副本数=2(一共就有3份)创建之后分片数不可改变
由于我们只有1个实例,令副本数=0
kibana格式为
PUT twitter
{
"settings" : {
"index" : {
"number_of_shards" : 3,
"number_of_replicas" : 0
}
}
}

curl为
curl -X PUT "localhost:9200/twitter?pretty" -H 'Content-Type: application/json' -d'
{
"settings" : {
"index" : {
"number_of_shards" : 3,
"number_of_replicas" : 0
}
}
}
'
索引有aliases和mappings.mappings指定了本索引的数据结构
响应为
{
"acknowledged": true,
"shards_acknowledged": true,
"index": "twitter"
}

获取索引
GET /twitter

{
"twitter": {
"aliases": {},
"mappings": {},
"settings": {
"index": {
"creation_date": "1621616190728",
"number_of_shards": "3",
"number_of_replicas": "0",
"uuid": "wD9pR9FMTv2wUpgbY4Ao9g",
"version": {
"created": "6020499"
},
"provided_name": "twitter"
}
}
}
}

删除索引
DELETE /twitter
{
"acknowledged": true
}

索引是否存在
HEAD twitter
不存在会返回http404 not found,存在则返回为空(http 200)

查看所有索引
GET _cat/indices

向索引中添加文档
url通常为 索引名称/type名称/id.
PUT twitter/_doc/1
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
现在我们重新看索引结构GET /twitter
可以看到ES自动推断了数据类型user和message为text,post_date为date

{
"twitter": {
"aliases": {},
"mappings": {
"_doc": {
"properties": {
"message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"post_date": {
"type": "date"
},
"user": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
},
"settings": {
"index": {
"creation_date": "1621616510638",
"number_of_shards": "3",
"number_of_replicas": "0",
"uuid": "jEwWvcknRI2nkTVVo_heoA",
"version": {
"created": "6020499"
},
"provided_name": "twitter"
}
}
}
}

如果我们再放入一个不同结构的文档呢
PUT twitter/_doc/2
{
"user" : "tangsong",
"post_date" : "2009-12-15T14:12:12",
"country" : "cn"
}
发现还是可以插入成功
重新执行GET /twitter会发现现在索引有4个字段
改和增加的请求一样。如果id已有那就修改
如果希望自动生成id则需要将请求路径改为POST twitter/_doc/

数据查询
GET twitter/_doc/1

{
"_index": "twitter",
"_type": "_doc",
"_id": "1",
"_version": 3,
"found": true,
"_source": {
"user": "zhuzhu",
"post_date": "2009-11-15T14:12:12",
"message": "trying out Logstash"
}
}
没有的字段是不会出现在返回结果的(id=1的没有country而id=2的没有message)
也可以带条件搜索
GET twitter/_doc/_search?q=user:zhuzhu

term与terms查询
所谓的DSL查询(Domain Specified Language特定领域语言)
GET /twitter/_search
{
"query" : {
"term" : { "user" : "zhuzhu" }
}
}
terms则可以指定多个关键字
GET /twitter/_search
{
"query" : {
"terms" : { "user" : ["zhuzhu","tangsong"] }
}
}
match查询类似.但是会进行分词.这样user为zhuzhu和tangsong都可以匹配,但是会给出一个score表示匹配程度

GET /twitter/_search
{
"query" : {
"match" : { "user" : "zhuzhu tangsong" }
}
}

过滤查询(filter查询)
GET /twitter/_search
{
"query": {
"bool": {
"filter": {
"term": {
"user": "tangsong"
}
}
}
}
}
bool查询可以包含must,should和must_not
should表示子项中满足其一即可
可以嵌套(should里面又有bool)
聚合查询aggs
<properties>
<elasticsearch.version>6.2.4</elasticsearch.version>
</properties>
<dependencies>

<dependency>
<groupid>org.apache.logging.log4j</groupid>
<artifactid>log4j-api</artifactid>
<version>2.8.2</version>
</dependency>

<dependency>
<groupid>org.apache.logging.log4j</groupid>
<artifactid>log4j-core</artifactid>
<version>2.8.2</version>
</dependency></dependencies>

    <!-- elastic search client -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11-beta-1</version>
        <scope>test</scope>
    </dependency>
</dependencies>

注意端口必须为9200否则可能报错This is not a HTTP port
TransportClient会使用9300端口但是RestClient使用9200端口

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class Test {
@org.junit.Test
public void test2() throws IOException {

    RestClientBuilder restClientBuilder =
            RestClient.builder(new HttpHost("10.227.26.77", 9200, "HTTP"))
                    // request retry max total timeout
                    .setMaxRetryTimeoutMillis(300000)
                    .setRequestConfigCallback(
                            (requestConfigBuilder) ->
                                    requestConfigBuilder
                                            // socket connect established timeout
                                            .setConnectTimeout(10000)
                                            // data package interval timeout
                                            .setSocketTimeout(120000)
                                            // timeout of requesting a connection from the connection manager
                                            .setConnectionRequestTimeout(60000));
    RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    Map<String, Object> settings = new HashMap<>();
    settings.put("ignore_unavailable", true);
    IndicesOptions indicesOptions = IndicesOptions.fromMap(settings, SearchRequest.DEFAULT_INDICES_OPTIONS);
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();


    SearchRequest searchRequest = new SearchRequest()
            .indicesOptions(indicesOptions)
            .indices("twitter")
            .types("_doc")
            .source(searchSourceBuilder)
            .scroll(TimeValue.timeValueSeconds(60));
    SearchResponse response = client.search(searchRequest);
    System.out.println(response);
}

}

为了更好地进行测试,我们重置一下测试数据
DELETE /twitter

PUT twitter/_doc/1
{
"user" : "user1",
"dim1" : "1",
"dim2" : "2",
"dim3" : "3",
"cost" : 72
}

PUT twitter/_doc/2
{
"user" : "user2",
"dim1" : "1",
"dim2" : "3",
"dim3" : "2",
"cost" : 10
}

PUT twitter/_doc/3
{
"user" : "user3",
"dim1" : "2",
"dim2" : "1",
"dim3" : "3",
"cost" : 12
}

PUT twitter/_doc/4
{
"user" : "user4",
"dim1" : "2",
"dim2" : "3",
"dim3" : "1",
"cost" : 75
}

PUT twitter/_doc/5
{
"user" : "user5",
"dim1" : "3",
"dim2" : "1",
"dim3" : "2",
"cost" : 59
}

PUT twitter/_doc/6
{
"user" : "user6",
"dim1" : "3",
"dim2" : "2",
"dim3" : "1",
"cost" : 42
}

下面的请求格式错误
GET /twitter/_search
{
"query": {
"term": {
"dim3": "1"
},
"match": {
"user": "user1 "
}
}
}
总结:query里面可以有1个term,terms,match等叶子匹配条件但是不能有多个(组合条件要放在bool里面)
term,match不能直接放在bool下一层,而必须放在must,must_not,should里面
[bool] query does not support [term]
bool下面同时有must和should的时候should不生效
GET /twitter/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"dim3": "1"
}
}
],
"should": [
{
"match": {
"dim1": "1"
}
}
]
}
}
}
这样所有dim3=1的都被匹配到了,dim1可以不为1
但是如果must_not与should一起则should也能生效.must和must_not一起则都有效
对结果进行limit可以在query的外面(也就是最外面一层)
"from":0,
"size":1

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

    BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
    TermQueryBuilder termQueryBuilder=new TermQueryBuilder("dim1","1");
    boolQueryBuilder.must(termQueryBuilder);

    searchSourceBuilder.query(boolQueryBuilder);
    SearchRequest searchRequest = new SearchRequest()
            .indicesOptions(indicesOptions)
            .indices("twitter")
            .types("_doc")
            .source(searchSourceBuilder)
            .scroll(TimeValue.timeValueSeconds(60));
    SearchResponse response = client.search(searchRequest);
    for(SearchHit searchHit: response.getHits()){
        System.out.println(searchHit.getIndex());//twitter
        System.out.println(searchHit.getClusterAlias());//null
        System.out.println(searchHit.getSourceAsString());//{"user":"user2","dim1":"1","dim2":"3","dim3":"2","cost":10}
    }

指定mapping
es的mapping就是每个文档的数据格式,有一个dynamic属性可以设置为true,false,strict
true:插入的数据有新字段则将新字段加入类型mapping,false则新字段不会被加入mapping但是仍然会加入到文档中,在搜索的时候也可以正确返回这个字段,strict则直接报错.默认为true
我们可以在PUT新建索引的时候指定mapping
PUT 索引名称
{
"mappings": {
"type_name": {
"properties": {
"field_name": {
"type": "field_type_name"
}
}
}
}

PUT /lib6
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"books": {
"dynamic": false,
"properties": {
"title": {
"type": "text"
},
"name": {
"type": "text",
"analyzer": "standard"
},
"publish_date": {
"type": "date",
"index": false
},
"price": {
"type": "double"
},
"number": {
"type": "integer"
}
}
}
}
}

修改mapping,将dynamic设置为strict
PUT /lib6/books/_mapping
{
"dynamic": "strict"
}
再次尝试插入一个文档其中含有mapping中没有的字段会报错
PUT lib6/books/2
{
"name" : "user1",
"publish_date" : "2021-05-05T00:00:00",
"price" : "2",
"field_not_exists" : "1"
}

{
"error": {
"root_cause": [
{
"type": "strict_dynamic_mapping_exception",
"reason": "mapping set to strict, dynamic introduction of [field_not_exists] within [books] is not allowed"
}
],
"type": "strict_dynamic_mapping_exception",
"reason": "mapping set to strict, dynamic introduction of [field_not_exists] within [books] is not allowed"
},
"status": 400
}

es聚合统计
例如我们要求dim1=1的cost最大值或者按照dim1 group by后求每组的和
如果按照某些字段group by则这些字段必须为keyword.所以我们实际上是用dim1.keyword进行group by

GET /twitter/_search
{
"query": {
"term": {
"dim2": "1"
}
},
"aggs": {
"group_info": {
"terms": {
"field": "dim1.keyword"
},
"aggs": {
"cost_sum": {
"sum": {
"field": "cost"
}
}
}
}
}
}

响应如下.除了hits外还有一项aggragation返回聚合值.es的group by每组称为一个bucket.(也就是dim1相同的文档进入了同一个bucket)
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 0.2876821,
"hits": [
{
"_index": "twitter",
"_type": "_doc",
"_id": "5",
"_score": 0.2876821,
"_source": {
"user": "user5",
"dim1": "3",
"dim2": "1",
"dim3": "2",
"cost": 59
}
},
{
"_index": "twitter",
"_type": "_doc",
"_id": "3",
"_score": 0.2876821,
"_source": {
"user": "user3",
"dim1": "2",
"dim2": "1",
"dim3": "3",
"cost": 12
}
}
]
},
"aggregations": {
"group_info": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "2",
"doc_count": 1,
"cost_sum": {
"value": 12
}
},
{
"key": "3",
"doc_count": 1,
"cost_sum": {
"value": 59
}
}
]
}
}
}

QueryBuilders类(注意不是QueryBuilder接口)有一些static方法可以让代码看起来更简洁一些
当需要进行计数(count,count distinct)的时候使用terms聚合
我们向_doc中再添加1条记录

PUT twitter/_doc/7
{
"user" : "user1",
"dim1" : "1",
"dim2" : "2",
"dim3" : "3",
"cost" : 35
}

这样user1,dim1=1,dim2=2,dim3=3的记录就有2条了.

假设我们希望效果是
select count(*) group by dim1

GET /twitter/_search
{

"aggs": {
"group_info": {
"terms": {
"field": "dim1.keyword"
},
"aggs": {
"cost_sum": {
"terms": {
"field": "dim1.keyword"
}
}
}
}
}
}
返回结果的aggregations部分
"aggregations": {
"group_info": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "1",
"doc_count": 3,
"cost_sum": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "1",
"doc_count": 3
}
]
}
},
{
"key": "2",
"doc_count": 2,
"cost_sum": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "2",
"doc_count": 2
}
]
}
},
{
"key": "3",
"doc_count": 2,
"cost_sum": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "3",
"doc_count": 2
}
]
}
}
]
}

当使用restAPI的时候,注意response.getAggregations()并不会包含我们要的信息.必须先获取指定的aggregation然后遍历它的所有bucket获取结果.
@org.junit.Test
public void test1() throws IOException {

    RestClientBuilder restClientBuilder =
            RestClient.builder(new HttpHost("10.227.26.77", 9200, "HTTP"))
                    // request retry max total timeout
                    .setMaxRetryTimeoutMillis(300000)
                    .setRequestConfigCallback(
                            (requestConfigBuilder) ->
                                    requestConfigBuilder
                                            // socket connect established timeout
                                            .setConnectTimeout(10000)
                                            // data package interval timeout
                                            .setSocketTimeout(120000)
                                            // timeout of requesting a connection from the connection manager
                                            .setConnectionRequestTimeout(60000));
    RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    Map<String, Object> settings = new HashMap<>();
    settings.put("ignore_unavailable", true);
    IndicesOptions indicesOptions = IndicesOptions.fromMap(settings, SearchRequest.DEFAULT_INDICES_OPTIONS);
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();


    TermsAggregationBuilder aggregation= AggregationBuilders.terms("aggregation_value_name").field("dim1.keyword");
    searchSourceBuilder.aggregation(aggregation);
    SearchRequest searchRequest = new SearchRequest()
            .indicesOptions(indicesOptions)
            .indices("twitter")
            .types("_doc")
            .source(searchSourceBuilder)
            .scroll(TimeValue.timeValueSeconds(60));
    SearchResponse response = client.search(searchRequest);
    ParsedStringTerms parsedStringTerms  = response.getAggregations().get("aggregation_value_name");
    for (Terms.Bucket bucket : parsedStringTerms.getBuckets()) {
        String key = bucket.getKey().toString();
        long docCount = bucket.getDocCount();
        System.out.println(key+" "+docCount);
    }
    for(Aggregation aggregation1:response.getAggregations()){
        System.out.println(aggregation1.getType());
        System.out.println(aggregation1.getName());
    }
}

如果我们希望按某个字段去重则需要使用cardinality聚合
select count(distinct user1) group by dim1

GET /twitter/_search
{

"aggs": {
"group_info": {
"terms": {
"field": "dim1.keyword"
},
"aggs": {
"distinct_user_count": {
"cardinality": {
"field": "user.keyword"
}
}
}
}
}
}

对应的RestAPI中应该调用TermsAggregationBuilder的subAggregation指定group by后的聚合函数

    TermsAggregationBuilder aggregation= AggregationBuilders.terms("aggregation_value_name").field("dim1.keyword").subAggregation(AggregationBuilders.sum("cost_sum").field("cost"));
    searchSourceBuilder.aggregation(aggregation);
    SearchRequest searchRequest = new SearchRequest()
            .indicesOptions(indicesOptions)
            .indices("twitter")
            .types("_doc")
            .source(searchSourceBuilder)
            .scroll(TimeValue.timeValueSeconds(60));
    SearchResponse response = client.search(searchRequest);
    ParsedStringTerms parsedStringTerms  = response.getAggregations().get("aggregation_value_name");
    for (Terms.Bucket bucket : parsedStringTerms.getBuckets()) {
        String key = bucket.getKey().toString();
        long docCount = bucket.getDocCount();
        ParsedSum sum =bucket.getAggregations().get("cost_sum");
        System.out.println(key+" "+docCount+" "+sum.getValue());
    }

bulk操作

bulk对JSON串的有着严格的要求。每个JSON串不能换行,只能放在同一行,同时,相邻的JSON串之间必须要有换行(Linux下是\n;Window下是\r\n)。bulk的每个操作必须要一对JSON串(delete语法除外)。

{ action: { metadata }}
{ request body }
{ action: { metadata }}
{ request body }

POST _bulk
{"create": {"_index": "twitter", "_type": "_doc", "_id": 8}}
{"user": "user2", "dim1":"1","dim2":"2","dim3":1,"cost":100}
{"create": {"_index": "twitter", "_type": "_doc", "_id": 9}}
{"user": "user3", "dim1":"1","dim2":"2","dim3":1,"cost":110}

BulkRequest由多个IndexRequest组成
BulkRequest bulkRequest = new BulkRequest();
Map<String,String>map=new HashMap<>();
map.put("user","user1");
map.put("dim1","1");
map.put("dim2","2");
map.put("dim3","3");
map.put("cost","42");

        IndexRequest indexRequest =
                new IndexRequest("twitter", "_doc", "10")
                        .source(map);
        bulkRequest.add(indexRequest);

    BulkResponse bulkResponse = client.bulk(bulkRequest);

ES复合数据结构.ES用json存储数据所以properties中的每一个字段可以是object.可以继续指定properties
PUT /lib1
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"cost_info": {
"dynamic": true,
"properties": {
"user": {
"properties": {
"name": {
"type": "text"
},
"age": {
"type": "integer"
},
"country": {
"type": "text"
}
}
},
"cost": {
"type": "double"
}
}
}
}
}
现在插入数据可以用普通json格式

PUT lib1/cost_info/1
{
"user" : {"name":"zhuzhu","age":25,"country":"cn"},
"cost" : 59
}

PUT lib1/cost_info/2
{
"user" : {"name":"ts","age":20,"country":"us"},
"cost" : 159
}
访问字段用通常的user.name即可
假设我们要求user.age平均值

GET lib1/cost_info/_search
{
"aggs": {
"user_age-avg": {
"avg": {
"field": "user.age"
}
}
}
}

安装Logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz

Hello World
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
输入Hello World然后回车,稍等片刻即可看到输出
{
"message" => "Hello World",
"host" => "n227-026-077",
"@version" => "1",
"@timestamp" => 2021-06-02T05:39:13.656Z
}
要退出,可以ctrl+C
-f参数可以指定一个配置文件.上面的运行方式等价于使用这样的配置文件
input {
stdin { }
}
output {
stdout { }
}

假设我们希望输入来自某个目录下的log文件
input
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
标准的配置文件由3部分构成
# 输入
input {
...
}

过滤器

filter {
...
}

输出

output {
...
}
假设在/home/tangsong.math/repos/logstash_logs下有很多日志文件.文件名spring.log.2021-06-01_18
格式形如下面.我们希望将其存储到ES中.查阅资料发现ES6.3以后kibana自带了grok debugger而且这个匹配规则不太好写(自带pattern可能不够用)
https://grokconstructor.appspot.com/do/match#result
事实上logstash有插件直接处理TCP端口接收到的log4j数据
执行
bin/logstash-plugin install logstash-input-log4j
实际上高版本推荐使用filebeat收集log4j日志(测试发现下面的配置收集不到日志)

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.4-linux-x86_64.tar.gz
配置文件为filebeat.yml

output.elsaticsearch改为
output.logstash:
hosts: ["localhost:5044"]
prosecutors.enabled改为true
filebeat.prospectors:

Each - is a prospector. Most options can be set at the prospector level, so

you can use different prospectors for various configurations.

Below are the prospector specific configurations.

  • type: log

    Change to true to enable this prospector configuration.

    enabled: true

    Paths that should be crawled and fetched. Glob based paths.

    paths:

    • /home/tangsong.math/repos/filebeat-6.2.4-linux-x86_64/*.log

(似乎只能从本地收集.获取可以从HDFS收集?实际的处理一般是filebeat部署在采集机器中然后配置output.kafka,logstash消费kafka写入es)
我们mock一个日志文件
mock.log
[DEBUG] 2021-06-03 01:47:06,698 method:LogstashTest.test1(LogstashTest.java:10)
This is a debug message!
[INFO ] 2021-06-03 01:47:06,710 method:LogstashTest.test1(LogstashTest.java:11)
This is info message!
[WARN ] 2021-06-03 01:47:06,710 method:LogstashTest.test1(LogstashTest.java:12)
This is a warn message!
[ERROR] 2021-06-03 01:47:06,711 method:LogstashTest.test1(LogstashTest.java:13)
This is error message!
[ERROR] 2021-06-03 01:47:06,711 method:LogstashTest.test1(LogstashTest.java:18)
java.lang.ArithmeticException: / by zero

在logstash的目录下新建一个logstash.conf
注意地址不能填127.0.0.1否则不能接收其它主机传来的日志
input {
log4j {
host => "10.227.26.77"
port => 4560
}
}

output {
stdout {
codec => rubydebug
}
elasticsearch{
hosts => ["10.227.26.77:9200"]
index => "logstash_log4j_%{+YYYY-MM-dd}"
document_type => "log4j_type"
}
}

然后
bin/logstash -f logstash.conf
启动logstash

maven添加依赖
<dependency>
<groupid>log4j</groupid>
<artifactid>log4j</artifactid>
<version>1.2.17</version>
</dependency>

resource目录下的log4j.properties如下
log4j.rootLogger = debug,stdout,logstash

输出信息到控制抬

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

输出日志到logstash

log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.RemoteHost=10.227.26.77
log4j.appender.logstash.port=4560
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true

import org.apache.log4j.Logger;
import org.junit.Test;

public class LogstashTest {
public static final Logger logger = Logger.getLogger(LogstashTest.class);

@Test
public void test1() {

logger.debug("This is a debug message!");
logger.info("This is info message!");
logger.warn("This is a warn message!");
logger.error("This is error message!");

try {
  System.out.println(5 / 0);
} catch (Exception e) {
  logger.error(e);
}

}
}