http://blog.csdn.net/u010695420/article/details/44978079


Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合,最后存储到一个中心化数据存储系统中,方便进行数据分析。事实上flume也可以收集其他信息,不仅限于日志。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。相比较而言,flume NG更简单更易于管理操作。 
Flume OG:Flume original generation 即Flume 0.9.x版本 
Flume NG:Flume next generation ,即Flume 1.x版本。 
Flume NG用户参考手册:http://flume.apache.org/FlumeUserGuide.html# 
Flume1.5.2下载地址: 
http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz 
简单比较一下两者的区别: 
OG有三个组件agent、collector、master,agent主要负责收集各个日志服务器上的日志,将日志聚合到collector,可设置多个collector,master主要负责管理agent和collector,最后由collector把收集的日志写的HDFS中,当然也可以写到本地、给storm、给Hbase。 
 
NG最大的改动就是不再有分工角色设置,所有的都是agent,可以彼此之间相连,多个agent连到一个agent,此agent也就相当于collector了,NG也支持负载均衡,具体参见http://shiyanjun.cn/archives/915.html。 
 
现在我们准备了三台机器(虚拟机也可以,但出现的问题也会很多,比如内存空间不足,网络连接问题,此处已哭晕)。 
操作系统:CentOS 6.6 
Flume版本:1.5.2 
设置三台机器的IP地址分别为192.168.1.105,192.168.1.106,192.168.1.107,记得关闭防火墙: 
命令service iptables stop 
环境准备好之后,在flume/conf/下新建agent.conf文件,105和106配置一样,如下:

#Agent
flumeAgent.channels = c1
flumeAgent.sources  = s1
flumeAgent.sinks    = k1 
#flumeAgent Spooling Directory Source
#注(1)
flumeAgent.sources.s1.type = spooldir
flumeAgent.sources.s1.spoolDir  =/usr/logs/
flumeAgent.sources.s1.fileHeader = true
flumeAgent.sources.s1.deletePolicy =immediate
flumeAgent.sources.s1.batchSize =1000
flumeAgent.sources.s1.channels =c1
flumeAgent.sources.s1.deserializer.maxLineLength =1048576
#flumeAgent FileChannel
#注(2)
flumeAgent.channels.c1.type = file
flumeAgent.channels.c1.checkpointDir = /var/flume/spool/checkpoint
flumeAgent.channels.c1.dataDirs = /var/flume/spool/data
flumeAgent.channels.c1.capacity = 200000000
flumeAgent.channels.c1.keep-alive = 30
flumeAgent.channels.c1.write-timeout = 30
flumeAgent.channels.c1.checkpoint-timeout=600
# flumeAgent Sinks
#注(3)
flumeAgent.sinks.k1.channel = c1
flumeAgent.sinks.k1.type = avro
# connect to CollectorMainAgent
flumeAgent.sinks.k1.hostname = 192.168.1.107
flumeAgent.sinks.k1.port = 44444
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

我们设置107机器为汇聚节点,105和106的日志文件都将集中到这里,在flume/conf/下新建consolidatio.conf文件,配置如下:

#flumeConsolidationAgent
flumeConsolidationAgent.channels = c1
flumeConsolidationAgent.sources  = s1
flumeConsolidationAgent.sinks    = k1 

#flumeConsolidationAgent Avro Source
#注(4)
flumeConsolidationAgent.sources.s1.type = avro
flumeConsolidationAgent.sources.s1.channels = c1
flumeConsolidationAgent.sources.s1.bind = 192.168.1.107
flumeConsolidationAgent.sources.s1.port = 44444

#flumeConsolidationAgent FileChannel
flumeConsolidationAgent.channels.c1.type = file
flumeConsolidationAgent.channels.c1.checkpointDir = /var/flume/spool/checkpoint
flumeConsolidationAgent.channels.c1.dataDirs = /var/flume/spool/data
flumeConsolidationAgent.channels.c1.capacity = 200000000
flumeConsolidationAgent.channels.c1.keep-alive = 30
flumeConsolidationAgent.channels.c1.write-timeout = 30
flumeConsolidationAgent.channels.c1.checkpoint-timeout=600

##flumeConsolidationAgent Memory Channel
#flumeConsolidationAgent.channels.c1.type = memory
#flumeConsolidationAgent.channels.c1.capacity = 10000
#flumeConsolidationAgent.channels.c1.transactionCapacity = 10000
#flumeConsolidationAgent.channels.c1.byteCapacityBufferPercentage = 20
#flumeConsolidationAgent.channels.c1.byteCapacity = 800000

#flumeConsolidationAgent Sinks
#注(5)
flumeConsolidationAgent.sinks.k1.channel= c1
flumeConsolidationAgent.sinks.k1.type = file_roll
flumeConsolidationAgent.sinks.k1.sink.directory = /var/tmp
flumeConsolidationAgent.sinks.k1.sink.rollInterval = 3600
flumeConsolidationAgent.sinks.k1.batchSize = 10000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

配置完成后,先启动107,命令行:

bin/flume-ng agent --conf conf --conf-file conf/consolidatio.conf --name flumeConsolidationAgent -Dflume.root.logger=DEBUG,console
  • 1

再启动105和106,命令行:

bin/flume-ng agent --conf conf --conf-file conf/agent.conf --name flumeAgent -Dflume.root.logger=DEBUG,console
  • 1

此处有三种模式运行:INFO、DEBUG、ERROR,以次容错度升高。 
注(1):sources类型为spooldir,监控某一目录下的文件,一旦有文件进入,则收割。被收割的文件不能再打开编辑,此处设置收割完毕后直接删除文件,这儿我出现的一个问题是,直接手动往该目录下拷贝文件一旦文件大小高于20M左右就宕机,提前拷入则正常,后来查到原因是拷贝的的速度远远小于收割的速度,有种文件***作的感觉,所以出错。 
注(2):channel类型为file。 
MemoryChannel: 所有的events被保存在内存中。优点是高吞吐。缺点是容量有限并且Agent死掉时会丢失内存中的数据。 
FileChannel: 所有的events被保存在文件中。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。 
注(3):sink类型为avro,hostname都要指向的consolidation的IP,端口号可以任意设置,只要不冲突就行,这样也会出现问题,当107的flumeConsolidationAgent重新启动后会出现“地址已被占用的”错误,要么修改端口号,要么杀死该进程。 
注(4):consolidation要绑定本机的IP,端口号也要与105和106一致。 
注(5):此处我们将收集的文件存入本地,并没有写入HDFS,因为还要装Hadoop。这里要千万注意所有sinks的channel,比如x.sinks.k1.channel = c1中的channel一定不加s。 
成功后,我们将看到如下信息: 

但是单独使用flume意义不大,需要结合其他工具一起组成大数据架构。 
大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合: 
http://www.aboutyun.com/thread-6855-1-1.html(出处: about云开发) 
Flume+Kafka+Strom基于分布式环境的结合使用: 
http://www.aboutyun.com/thread-8915-1-1.html(出处: about云开发)

欢迎批评指正!