Spark Streaming实时流处理项目1——分布式日志收集框架Flume的学习

Spark Streaming实时流处理项目2——分布式消息队列Kafka学习

Spark Streaming实时流处理项目3——整合Flume和Kafka完成实时数据采集

Spark Streaming实时流处理项目4——实战环境搭建

Spark Streaming实时流处理项目5——Spark Streaming入门

Spark Streaming实时流处理项目6——Spark Streaming实战1

Spark Streaming实时流处理项目7——Spark Streaming实战2

Spark Streaming实时流处理项目8——Spark Streaming与Flume的整合

Spark Streaming实时流处理项目9——Spark Streaming整合Kafka实战

Spark Streaming实时流处理项目10——日志产生器开发并结合log4j完成日志的输出

Spark Streaming实时流处理项目11——综合实战

源码​​​​​​​

案例1——基于Receiver的整合

①启动zookeeper

②启动kafka,同时在四台机器上启动kafka的shell脚本如下:

brokers="hadoop0 hadoop1 hadoop2 hadoop3"
kafka_home="/soft/kafka"

for i in $brokers
do
    echo "Starting kafka on ${i} ... "
    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
    if [[ $? -ne 0 ]]; then
        echo "Start kafka on ${i} is OK !"
    fi
done
echo kafka kafka are started !
exit 0

③创建一个topic:./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic

④查看是否创建成功:./kafka-topics.sh --list --zookeeper localhost:2181

⑤通过控制台测试本topic是否可用正常的生产和消费数据:

kafka生产者:./kafka-console-producer.sh --broker-list 192.168.25.128:9092 --topic kafka_streaming_topic

换一台机器在创建一个消费者:

kafka消费者:./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic

测试可用。

下面就可以开发Spark Streaming的应用程序啦。

先添加依赖:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

程序:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author YuZhansheng
  * @desc  Spark Streaming对接kafka的方式1——基于Receiver的整合
  * @create 2019-02-23 11:28
  */
object KafkaReceiverWordCount {

    def main(args: Array[String]): Unit = {

        //参数通过IDEA传入
        if (args.length != 4){
            System.err.println("Usage:KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
        }

        val Array(zkQuorum,group,topics,numThreads) = args

        val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")

        val ssc = new StreamingContext(sparkConf,Seconds(5))

        val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

        //TODO ..SparkStreaming对接Kafka
        //val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
        val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)

        messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

通过IDEA传递参数:

测试:

所以,本地环境测试通过!

注意,上传到服务器联调测试的时候,要把setAppName("KafkaReceiverWordCount").setMaster("local[2]")给注释掉。使用maven打包上传;提交命令:

spark-submit \

--class com.xidian.spark.KafkaReceiverWordCount \

--master local[2]  \

--name KafkaReceiverWordCount \

--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0  \

/root/Project/spark-1.0.jar hadoop0:2181 test kafka_streaming_topic 1

案例2——基于Direct的整合(No Receivers)

Spark 1.3中引入了这种新的无接收方直接的方法,以确保更好的端到端保证。这种方法不使用接收者来接收数据,而是定期查询Kafka以获得每个主题分区中的最新偏移量,并相应地定义每个批处理中要处理的偏移量范围。在启动处理数据的作业时,Kafka的simple consumer API用于从Kafka读取已定义的偏移量范围(类似于从文件系统读取文件)。注意,这个特性是在Scala和Java API的Spark 1.3以及Python API的Spark 1.4中引入的。

与基于Receiver的方法(即方法1)相比,这种方法具有以下优点。

简化并行性:不需要创建多个输入Kafka流并将它们合并。使用directStream, Spark流将创建尽可能多的RDD分区,因为有Kafka分区要使用,这些分区将并行地从Kafka读取数据。因此Kafka和RDD分区之间存在一对一的映射,这更容易理解和调优。

效率:第一种方法中实现零数据丢失需要将数据存储在写前日志中,这将进一步复制数据。这实际上是低效的,因为数据被有效地复制了两次——一次由Kafka复制,第二次由Write-Ahead日志复制。第二种方法消除了这个问题,因为没有接收方,因此不需要提前写日志。只要您有足够的Kafka保留,就可以从Kafka恢复消息。

精确一次的语义:第一种方法使用Kafka的高级API在Zookeeper中存储消费的偏移量。这是使用Kafka数据的传统方式。虽然这种方法(结合写前日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有些记录可能会被使用两次。这是因为Spark流可靠接收的数据与Zookeeper跟踪的偏移量之间存在不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。偏移量通过其检查点内的Spark流进行跟踪。这消除了Spark流和Zookeeper/Kafka之间的不一致性,因此尽管出现故障,Spark流仍然能够有效地接收每条记录一次。为了为结果的输出实现精确的一次语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务(有关更多信息,请参阅主编程指南中的输出操作语义)。

请注意,这种方法的一个缺点是它不更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监控工具不会显示进度。但是,您可以在每个批处理中访问这种方法处理的偏移量,并自己更新Zookeeper。

注意:案例一基于Receiver的整合在老版本(0.8.*.*-0.10.*.*)和新版本(0.10.*.*以上)都可以使用,但是本机kafka版本是0.11.0.1,所以需要修改pom文件:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * @author YuZhansheng
  * @desc  Spark Streaming对接kafka的方式2——基于Direct的整合
  * @create 2019-02-23 11:28
  */
object KafkaDirectWordCount {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")

        val ssc = new StreamingContext(sparkConf,Seconds(5))

        val topics:String = "kafka_streaming_topic"
        val topicarr = topics.split(",")

        val brokers = "hadoop0:9092,hadoop1:9092,hadoop2:9092,hadoop3:9092"

        val kafkaParams:Map[String,Object] = Map[String,Object](
            "bootstrap.servers" -> brokers,
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "test",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )

        val kafka_streamDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(
            ssc,PreferConsistent,
            Subscribe[String,String](topicarr,kafkaParams))

        val resDStream: DStream[((Long, Int, String), Int)] = kafka_streamDStream.map(line =>
            (line.offset(), line.partition(), line.value())).
            flatMap(t =>{t._3.split(" ").map(word => (t._1,t._2,word))}).
            map(k => ((k._1,k._2,k._3),1)).reduceByKey(_ + _)

        resDStream.print()

        ssc.start()
        ssc.awaitTermination()
    }
}