http://blog.csdn.net/alphags/article/details/52862578?locationNum=10&fps=1
本文内容主要参考自Apache Flume用户文档(http://flume.apache.org/FlumeUserGuide.html),由于关于Apache Flume 1.X的中文参考资料不是很多,所以这里将我部署的过程记录下来,希望能给有同样需要的人们有一些提示作用。
(英文文档的内容很多,这里只写一些我自己用到的)
Overview
Apache Flume 是一个高效的分布式日志收集系统,可以将大量的日志数据从不同的数据源集中到一起。(PS:知道这些就够了)
System Requirments
1、JDK 1.7+
2、充足的内存
3、磁盘还有可用空间
4、对有相应目录的读写权限
数据流模型
从图上可以看到,每一个Agent 包含一个Source 一个channel 一个sink
source 可以理解为数据源(日志文件、AVRO、… 有很多 看文档就能知道,我只用到了文件)
sink 可以理解为数据目的地(同样也有很多,我测试环境也是直接写到文件)
channel 可以理解为数据流管道(种类也有不少,文档中给的例子是用的内存,但是内存是不稳定的,所以我的测试环境也换成了文件)
简单表述三者作用(不严谨表述):source 读取日志数据将其写入channel中,sink从channel中读取数据然后写到其指定的地方。这里如果sink写失败,那么数据就会一直在channel中堆积直到sink恢复正常(这样就确保了日志数据不会丢失)
多个 Apache Flume Agent 还可以连接在一起,模型如下图所示
知道了以上内容就可以开始搭建测试环境了
硬件环境
三台服务器IP地址分别为192.168.0.101~103,全部使用linux ubuntu 12.04 server 操作系统
系统结构
安装过程
wget http://mirrors.cnnic.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz #下载压缩包
tar -xvzf apache-flume*.tar.gz #解压
mv apache-flume /data/local/flume #本人比较喜欢把把软件安装在/data/local目录下
- 1
- 2
- 3
配置
这里我直接把我本地的配置发上来
# flume.conf: A Flume configuration
# Agent a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source 配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/logs/system.log
# sink 配置
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.0.101
a1.sinks.k1.port=4545
# channel 配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/data/logs/channels/a1/checkpoint
a1.channels.c1.dataDirs = /data/logs/channels/a1/data
# 绑定source、single到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#ageng a2
a2.sources=r2
a2.sinks=k2
a2.channels=c2
#a2 source 配置
a2.sources.r2.type=avro
a2.sources.r2.bind=192.168.0.101
a2.sources.r2.port=4545
#a2 sink 配置 将合并后的日志数据写到/data/local/collector目录下
a2.sinks.k2.type = file_roll
a2.sinks.k2.sink.directory = /data/local/collector
a2.sinks.k2.sink.rollInterval=3600
##下面是注释掉的代码为配置sink ,日志按天合并后保存到单独的文件夹中
#a2.sinks.k2.type=hdfs
#a2.sinks.k2.hdfs.path=hdfs://hadoop-master:9000/events/%y-%m-%d
#a2.sinks.k2.hdfs.filePrefix=events-
#a2.sinks.k2.hdfs.rollInterval=0
#a2.sinks.k2.hdfs.rollSize=0
#a2.sinks.k2.hdfs.rollCount=0
#a2.sinks.k2.hdfs.useLocalTimeStamp=true
#a2 channel配置
a2.channels.c2.type = file
a2.channels.c2.checkpointDir=/data/logs/channels/a4/checkpoint
a2.channels.c2.dataDirs = /data/logs/channels/a4/data
# 绑定source、single到channel上
a2.sources.r2.channels=c4
a2.sinks.k2.channel=c4
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
说明:
运行
在192.168.0.101上使用下面命令启动Agent2(a2)
bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a2 -Dflume.root.logger=INFO,console
- 1
在192.168.0.101另启动一个终端,运行下面命令启动Agent1(a1)
bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
- 1
分别在192.168.0.102~103上同样启动Agent1(a1)
bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
- 1
生成测试数据的python小程序
#!/usr/bin/python
import os
import random
from time import ctime,sleep
for i in range(1,1000000):
smil=random.randint(50,100)
print smil/1000.0
com="echo \"hello message from 202\t"+str(i)+"\">> /data/logs/system.log";
print com
os.system(com)
sleep(smil/1000.0)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
测试结果
可以看到在目录/data/logs/collector 目录下生成了多个合并后的日志文件
PS:
建议大家阅读Flume的文档,虽然英文读起来比较累但是大家搞技术的谁不懂点英文呢,所以下点功夫还是能看懂的。
如果通过本文仍不能理解Flume的工作流程,建议大家学习文档中给出的 A simple example 。(下面作个简单解释)
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
原文:This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console.
译:这个配置定义了一个名称为a1的agent。a1有一个source 监听端口44444 (接收44444端口的任何收到的数据),一个内存管道(channel)将所有接收到的数据都暂存在内存当中,一个sink 将收到的数据打印到控台。
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
- 1
执行上述命令后在另外一个命令窗口执行(没有telnet? apt-get install telnet 或者yum install telnet 安装 )
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
- 1
- 2
- 3
- 4
- 5
- 6
我们可以看到Flume 的控台打印出
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
- 1
- 2
- 3