Spark Streaming实时流处理项目1——分布式日志收集框架Flume的学习

Spark Streaming实时流处理项目2——分布式消息队列Kafka学习

Spark Streaming实时流处理项目3——整合Flume和Kafka完成实时数据采集

Spark Streaming实时流处理项目4——实战环境搭建

Spark Streaming实时流处理项目5——Spark Streaming入门

Spark Streaming实时流处理项目6——Spark Streaming实战1

Spark Streaming实时流处理项目7——Spark Streaming实战2

Spark Streaming实时流处理项目8——Spark Streaming与Flume的整合

Spark Streaming实时流处理项目9——Spark Streaming整合Kafka实战

Spark Streaming实时流处理项目10——日志产生器开发并结合log4j完成日志的输出

Spark Streaming实时流处理项目11——综合实战

源码​​​​​​​

先添加依赖

<!--SparkStreaming整合Flume的依赖-->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-flume_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

整合方式一:Push

Flume配置文件flume_push_streaming.conf :

simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind= hadoop0
simple-agent.sources.netcat-source.port= 44444

simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname= hadoop0
simple-agent.sinks.avro-sink.port= 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel

代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @ author YuZhansheng
  * @ desc Spark Streaming与Flume的整合-push方式
  * @ create 2019-02-21 10:04
  */
object FlumePushWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    //TODO...如何使用SparkStreaming整合Flume
    val flumeStream = FlumeUtils.createStream(ssc,"hadoop0",41414)

    flumeStream.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

先运行SparkStreaming程序,再启动Flume

启动Flume命令: flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console

测试验证:telnet hadoop0 44444

改进版:在程序中把主机名和端口号写死,不利于程序的扩展,故升级一下SparkStreaming的代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author YuZhansheng
  * @desc Spark Streaming与Flume的整合-push方式
  * @create 2019-02-21 10:04
  */
object FlumePushWordCount {

    def main(args: Array[String]): Unit = {

        //判断参数
        if(args.length != 2){
            System.err.println("Usage:FlumePushWordCount <hostname> <port>")
            System.exit(1)
        }

        //通过参数传递主机名和端口号
        val Array(hostname,port) = args

        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
        val ssc = new StreamingContext(sparkConf,Seconds(5))

        //TODO...如何使用SparkStreaming整合Flume
        val flumeStream = FlumeUtils.createStream(ssc,hostname,port.toInt)

        flumeStream.map(x => new String(x.event.getBody.array()).trim)
            .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

如何在IDEA中传递参数信息呢?

整合方式二:Pull

再加两个依赖:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-flume-sink_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.5</version>
</dependency>

Flume配置文件flume_pull_streaming.conf :

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind= hadoop0
simple-agent.sources.netcat-source.port= 44444

simple-agent.sinks.spark-sink.type =org.apache.spark.streaming.flume.sink.SparkSink

simple-agent.sinks.spark-sink.hostname= hadoop0
simple-agent.sinks.spark-sink.port= 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author YuZhansheng
  * @desc Spark Streaming与Flume的整合-pull方式
  * @create 2019-02-21 10:04
  */
object FlumePullWordCount {

    def main(args: Array[String]): Unit = {

        //判断参数
        if(args.length != 2){
            System.err.println("Usage:FlumePushWordCount <hostname> <port>")
            System.exit(1)
        }

        //通过参数传递主机名和端口号
        val Array(hostname,port) = args

        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
        val ssc = new StreamingContext(sparkConf,Seconds(5))

        //TODO...如何使用SparkStreaming整合Flume
        val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)

        flumeStream.map(x => new String(x.event.getBody.array()).trim)
            .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

先启动Flume,后启动SparkStreaming

 flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_pull_streaming.conf -Dflume.root.logger=INFO,console

异常:java.lang.IllegalStateException: begin() called when transaction is OPEN!

在flume文件夹下的 lib目录下jar包冲突 ,scala-library 版本问题导致jar包冲突,在maven仓库里找一个和本地安装Scala版本一致的jar,将原有的替换掉,例如我的本地环境是scala-library-2.11.8.jar

不知道什么原有,我在进行此案例测试的时候,使用netcat命令,一直出现如下问题:

[root@hadoop0 soft]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
telnet: connect to address 127.0.0.1: Connection refused

无奈,只能修改flume配置文件,将数据源source改为监控某一文件的增量,配置内容如下,

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = exec
simple-agent.sources.netcat-source.command = tail -F /soft/flume1.6/data/data.log
simple-agent.sources.netcat-source.shell = /bin/sh -c

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop0
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

SparkStreaming程序不变,先开启flume,再运行spark程序,向data.log文件中追加数据,看到控制台正确输出。