一、前置需要
jdk,zookeeper,storm1.1.1版本
二、安装logstash2.4.1
进入 https://www.elastic.co/cn/downloads/past-releases/logstash-2-4-1
下载tar.gz格式的文件即可。
三、简单使用logstash
官方文档:https://www.elastic.co/guide/en/logstash/2.4/installing-logstash.html
使用1
cd logstash-2.4.0
#表示采集控制台的数据,控制台输入,控制台输出
bin/logstash -e 'input { stdin { } } output { stdout {} }'
结果:
使用2
# 输入的是数据,返回的是数据的json串
bin/logstash -e 'input{stdin{}}output{stdout{codec=>json}}'
四:编写配置文件conf
监控控制台输入,控制台输出的文件
# 新建配置文件
vi test.conf
# 指定输入输出,编写test.conf
input{
stdin{}
}
output{
stdout{}
}
# 运行test.conf中的命令
bin/logstash -f test.conf
查看结果:
监控文件输入,控制台输出
# 编写配置文件
vi file_stdout.conf
input{
file {
path => "/home/hadoop/app/logstash-2.4.1/logstash.txt"
}
}
output{
stdout{
codec => json
}
}
#启动logstash
bin/logstash -f file_stdout.conf
另开一个窗口向logstash.txt增加内容
监控logstash的文件返回一个json串
五、安装kafka对接logstash
启动kafka,编写logstash配置文件
kafka常见命令
(1)查看当前服务器中的所有topic,zk01为主机名
kafka-topics.sh --list --zookeeper zk01:2181
(2)创建topic:partitions的分片为3,replication-factor 的备份数量1
kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 3 --topic first
(3)删除topic
kafka-topics.sh --delete --zookeeper zk01:2181 --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
(4)生产者与消费者问题:
4.1 通过shell命令发送消息:连接kafka的 kafka01:9092 的topic为first发送消息
kafka-console-producer.sh --broker-list localhost:9092 --topic first
4.2 过shell消费消息(新版本kafka的消费者使用方式,与旧版本不同)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning
启动kafka
zkServer.sh start
bin/kafka-server-start.sh config/server.properties
创建一个topic名为logstash
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic logstash
编写logstash的配置文件
# 新建配置文件
vi file_kafka.conf
input{
file {
path => "/home/hadoop/app/logstash-2.4.1/logstash.txt"
}
}
output{
kafka{
topic_id => "logstash"
bootstrap_servers => "localhost:9092"
}
}
启动consumer监控logstash主题
kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic logstash --from-beginning
启动logstash
bin/logstash -f file_kafka.conf
测试
echo “hello” => logstash.txt
kafka检测到logstash的消息