一、前置需要

jdk,zookeeper,storm1.1.1版本

二、安装logstash2.4.1

进入 https://www.elastic.co/cn/downloads/past-releases/logstash-2-4-1
下载tar.gz格式的文件即可。

三、简单使用logstash

官方文档:https://www.elastic.co/guide/en/logstash/2.4/installing-logstash.html

使用1

cd logstash-2.4.0

#表示采集控制台的数据,控制台输入,控制台输出
bin/logstash -e 'input { stdin { } } output { stdout {} }'  

结果:

使用2

# 输入的是数据,返回的是数据的json串
bin/logstash -e 'input{stdin{}}output{stdout{codec=>json}}'

四:编写配置文件conf

监控控制台输入,控制台输出的文件

# 新建配置文件
vi test.conf

# 指定输入输出,编写test.conf
input{
stdin{}
}
output{
stdout{}
}

# 运行test.conf中的命令
bin/logstash -f test.conf

查看结果:

监控文件输入,控制台输出

# 编写配置文件
vi file_stdout.conf

input{
 file {
        path => "/home/hadoop/app/logstash-2.4.1/logstash.txt"
    }
}

output{
 stdout{
 codec => json
 }
}

#启动logstash
bin/logstash -f file_stdout.conf

另开一个窗口向logstash.txt增加内容

监控logstash的文件返回一个json串

五、安装kafka对接logstash

启动kafka,编写logstash配置文件

kafka常见命令

(1)查看当前服务器中的所有topic,zk01为主机名
kafka-topics.sh --list --zookeeper  zk01:2181

(2)创建topic:partitions的分片为3,replication-factor 的备份数量1 
kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 3 --topic first

(3)删除topic
kafka-topics.sh --delete --zookeeper zk01:2181 --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

(4)生产者与消费者问题:
4.1 通过shell命令发送消息:连接kafka的 kafka01:9092 的topic为first发送消息
kafka-console-producer.sh --broker-list localhost:9092 --topic first
4.2 过shell消费消息(新版本kafka的消费者使用方式,与旧版本不同)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning

启动kafka

zkServer.sh start
bin/kafka-server-start.sh  config/server.properties

创建一个topic名为logstash

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic logstash

编写logstash的配置文件

# 新建配置文件
vi file_kafka.conf

input{
 file {
        path => "/home/hadoop/app/logstash-2.4.1/logstash.txt"
    }
}

output{
 kafka{
 topic_id => "logstash"
 bootstrap_servers => "localhost:9092"
 }
}

启动consumer监控logstash主题

 kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic logstash --from-beginning

启动logstash

bin/logstash -f file_kafka.conf

测试

echo “hello” => logstash.txt

kafka检测到logstash的消息