http://blog.csdn.net/alphags/article/details/52862578?locationNum=10&fps=1


本文内容主要参考自Apache Flume用户文档(http://flume.apache.org/FlumeUserGuide.html),由于关于Apache Flume 1.X的中文参考资料不是很多,所以这里将我部署的过程记录下来,希望能给有同样需要的人们有一些提示作用。 
(英文文档的内容很多,这里只写一些我自己用到的)

Overview

Apache Flume 是一个高效的分布式日志收集系统,可以将大量的日志数据从不同的数据源集中到一起。(PS:知道这些就够了)

System Requirments

1、JDK 1.7+ 
2、充足的内存 
3、磁盘还有可用空间 
4、对有相应目录的读写权限

数据流模型

 
从图上可以看到,每一个Agent 包含一个Source 一个channel 一个sink 
source 可以理解为数据源(日志文件、AVRO、… 有很多 看文档就能知道,我只用到了文件) 
sink 可以理解为数据目的地(同样也有很多,我测试环境也是直接写到文件) 
channel 可以理解为数据流管道(种类也有不少,文档中给的例子是用的内存,但是内存是不稳定的,所以我的测试环境也换成了文件) 
简单表述三者作用(不严谨表述):source 读取日志数据将其写入channel中,sink从channel中读取数据然后写到其指定的地方。这里如果sink写失败,那么数据就会一直在channel中堆积直到sink恢复正常(这样就确保了日志数据不会丢失) 
多个 Apache Flume Agent 还可以连接在一起,模型如下图所示 

知道了以上内容就可以开始搭建测试环境了

硬件环境

三台服务器IP地址分别为192.168.0.101~103,全部使用linux ubuntu 12.04 server 操作系统

系统结构

安装过程

  wget  http://mirrors.cnnic.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz  #下载压缩包
  tar -xvzf apache-flume*.tar.gz  #解压
  mv  apache-flume /data/local/flume #本人比较喜欢把把软件安装在/data/local目录下 
  • 1
  • 2
  • 3

配置

这里我直接把我本地的配置发上来

# flume.conf: A Flume configuration

# Agent a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source 配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/logs/system.log

# sink 配置
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.0.101
a1.sinks.k1.port=4545

# channel 配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/data/logs/channels/a1/checkpoint
a1.channels.c1.dataDirs = /data/logs/channels/a1/data

# 绑定source、single到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#ageng a2
a2.sources=r2
a2.sinks=k2
a2.channels=c2
#a2 source 配置
a2.sources.r2.type=avro
a2.sources.r2.bind=192.168.0.101
a2.sources.r2.port=4545

#a2 sink 配置 将合并后的日志数据写到/data/local/collector目录下
a2.sinks.k2.type = file_roll
a2.sinks.k2.sink.directory = /data/local/collector
a2.sinks.k2.sink.rollInterval=3600



##下面是注释掉的代码为配置sink ,日志按天合并后保存到单独的文件夹中
#a2.sinks.k2.type=hdfs
#a2.sinks.k2.hdfs.path=hdfs://hadoop-master:9000/events/%y-%m-%d
#a2.sinks.k2.hdfs.filePrefix=events-
#a2.sinks.k2.hdfs.rollInterval=0
#a2.sinks.k2.hdfs.rollSize=0
#a2.sinks.k2.hdfs.rollCount=0
#a2.sinks.k2.hdfs.useLocalTimeStamp=true

#a2 channel配置
a2.channels.c2.type = file
a2.channels.c2.checkpointDir=/data/logs/channels/a4/checkpoint
a2.channels.c2.dataDirs = /data/logs/channels/a4/data

# 绑定source、single到channel上
a2.sources.r2.channels=c4
a2.sinks.k2.channel=c4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

说明:

运行

在192.168.0.101上使用下面命令启动Agent2(a2)

bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a2 -Dflume.root.logger=INFO,console
  • 1

在192.168.0.101另启动一个终端,运行下面命令启动Agent1(a1)

bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
  • 1

分别在192.168.0.102~103上同样启动Agent1(a1)

bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
  • 1

生成测试数据的python小程序

#!/usr/bin/python
import os
import random
from time import ctime,sleep
for i in range(1,1000000):
        smil=random.randint(50,100)
        print smil/1000.0
        com="echo \"hello message from 202\t"+str(i)+"\">> /data/logs/system.log";
        print com
        os.system(com)
        sleep(smil/1000.0)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

测试结果

可以看到在目录/data/logs/collector 目录下生成了多个合并后的日志文件

PS:

建议大家阅读Flume的文档,虽然英文读起来比较累但是大家搞技术的谁不懂点英文呢,所以下点功夫还是能看懂的。 
如果通过本文仍不能理解Flume的工作流程,建议大家学习文档中给出的 A simple example 。(下面作个简单解释)

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

原文:This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. 
译:这个配置定义了一个名称为a1的agent。a1有一个source 监听端口44444 (接收44444端口的任何收到的数据),一个内存管道(channel)将所有接收到的数据都暂存在内存当中,一个sink 将收到的数据打印到控台。

 $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
  • 1

执行上述命令后在另外一个命令窗口执行(没有telnet? apt-get install telnet 或者yum install telnet 安装 )

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我们可以看到Flume 的控台打印出

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }
  • 1
  • 2
  • 3
版权声明:本文为博主原创文章,未经博主允许不得转载。