残月的技术日志

残月的技术日志

Spark 流计算

1023-06-23

什么是SparkStreaming

用于快速上手,有很多细节,后面有时间单独出

Spark Streaming 是Spark提供的一个流计算框架 点击跳转官方文档

与Flink类似,Spark Streaming也可从Kafka。Flume,TCP套接字等众多途径获取数据,也有map(),reduce(),windows()等等一系列的算子

**

Spark Streaming是基于Spark RDD Api(Spark Core Api)的扩展,最终的执行者还是Spark Core API,所以,与Flink不一样,Spark的流计算其实是使用微批实现的,所以在流计算过程可以使用RDD近乎相同的代码编写

具体在工作时,Spark Streaming会将收到的原数据以为单位切片成多个RDD序列,该序列被成为DStream(Discretized Stream | 离散流),DStream是Spark Streaming的一个高级抽象,用于表示类似Flink DataStream的哪种连续不断的数据,对一个DStream进行相应的算子计算,相当于对里面所有的RDD分别进行算子计算

与Flink一样,每个Application的执行是一个长期持久化的运行任务,所以,每个应用程序都会被分配并长期占用一个集群的核心(本地运行则为线程),所以,在运行较大项目时,请确保集群资源足够

小试牛刀

如果你是初学者,先快速跟着我体验下Spark流计算

首先我们要对POM文件添加相应的依赖

<!--spark streaming 流式计算库-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

简单颜色下,使用Spark去监听9999端口的套接字数据,并做词频统计

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test01 {
  def main(args: Array[String]): Unit = {
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")
    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(10))
    //你要在创建上下文是显性的告诉Spark DStream的数据时按几秒分割的,这我后面会讲

    val stream:ReceiverInputDStream[String] = ssc.socketTextStream("127.0.0.1",9999)

    stream
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    //执行
    ssc.start()               //开始计算
    ssc.awaitTermination()    //等待计算结果出来
  }
}

使用netcat,创建个服务端的套接字接口,随后发两条数据过去

[root@master ~]# nc -lk 9999
aaa bbb
aaa ccc

运行结果如下:

-------------------------------------------
Time: 1666512480000 ms
-------------------------------------------
(bbb,1)
(ccc,1)
(aaa,2)

可以看到,在设置了微批间隔为10秒后,相单于每10秒开一个滚动窗口,不过严格来说不能叫窗口,这仅是各个微批的批大小罢了

当然,为了好理解,前期直接将其看成时间窗口就行

输入

基本上每个DStream 输入都会创建一个Receiver用于持久化监听数据(除了来自文件的流,其他都要),每个Receiver会独占一个集群核心(本地运行则是线程)

文件流

文件流不仅仅可以是本地文件系统,也可是分布式文件系统,例如HDFS

textFileStream会去检查文件夹内新增的内容

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test01 {
  def main(args: Array[String]): Unit = {
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")
    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(10))

    val stream:DStream[String] = ssc.textFileStream("file:///D:/SparkProject/word/")

    stream
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()


    //执行
    ssc.start()
    ssc.awaitTermination()
  }
}

在word文件夹创建一个文件 内容如下

aaa bbb
-------------------------------------------
Time: 1666514150000 ms
-------------------------------------------
(aaa,1)
(bbb,1)

套接字

监听本地9999端口,上面小试牛刀演示过了

val stream:ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)

RDD队列

queueStream会监听RDD序列中新增内容

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object Test01 {
  def main(args: Array[String]): Unit = {
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")
    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(10))    //你要在创建上下文是显性的告诉Spark DStream的数据时按几秒分割的

    val sc = ssc.sparkContext

    val myRDDs = new mutable.Queue[RDD[Int]]()


    val stream = ssc.queueStream(myRDDs)

    stream
      .map((1,_))
      .reduce((a,b) => (a._1 + b._1 , a._2 + b._2))    //(count,sum)
      .map(d => "数据条数%d;和为%d".format(d._1,d._2))
      .print()


    //执行
    ssc.start()               //开始计算
//    ssc.awaitTermination()    //不等,继续执行

    for(i <- 1 to 30){     //循环30次
      myRDDs += sc.makeRDD(Array(i,i+1,i+2))   //向序列中新增RDD
      Thread.sleep(1000)    //间隔1秒,全部跑完要30秒
    }

    ssc.stop()
  }
}
-------------------------------------------
Time: 1666515630000 ms
-------------------------------------------
数据条数3;和为6

-------------------------------------------
Time: 1666515640000 ms
-------------------------------------------
数据条数3;和为9

-------------------------------------------
Time: 1666515650000 ms
-------------------------------------------
数据条数3;和为12

Kafka

这个实战中比较常见,多讲几句

Spark提供了两套Kafka整合方案,spark-streaming-kafka-0-8spark-streaming-kafka-0-10,前者已经在Spark2.3.0后弃用,所以下面我仅针对0-10

所以我们要在POM文件添加这个依赖

<!--spark 连接kafka-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
    <version>2.4.0</version>
</dependency>

我们可以通过调用KafkaUtils对象的createDirectStream方法来创建输入流

createDirectStream有三个参数

  • ssc 就是当前的流环境上下文

  • locationStrategy 位置策略,Saprk Streaming提供了三种,具体还在研究,分别是

  • PreferConsistent 它将在所有的 Executors 上均匀分配分区;

    • PreferBrokers 当 Spark 的 Executor 与 Kafka Broker 在同一机器上时可以选择该选项,它优先将该 Broker 上的首领分区分配给该机器上的 Executor;

    • PreferFixed 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机

  • consumerStrategy 消费者策略,分为如下两种

  • Subscribe[K,V] 根据主题集合订阅

    Subscribe有两个参数

    topic 传入一个字符串集合 将订阅集合内所有的Topic

    kafkaParams Map[String,Object] 用于定义Kafka消费者的参数

    • SubscribePattern[K,V] 根据正则订阅

      SubscribePattern也有两个参数

      ​ pattern java.util.regex.Pattern 正则表达式

      kafkaParams Map[String,Object] 用于定义Kafka消费者的参数


监听到的流中的每个数据都是一个ConsumerRecord[K, V] 的实例,其中包含如下内容,通常我们只需要拿到值就行,如下:

ConsumerRecord(
     topic = test,                                                /*主题*/
     partition = 0,                                               /*分区号*/
     offset = 15,                                                 /*偏移量*/
     CreateTime = 1666527833877,                                  /*消息创建的时间戳*/
     serialized key size = -1,                                    /*键序列化器长度*/
     serialized value size = 5,                                   /*值序列化器长度*/
     headers = RecordHeaders(headers = [], isReadOnly = false),   /*头部数据*/
     key = null,                                                  /*键*/
     value = aaaaa                                                /*值*/
)

来,我们简单实操一下

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test01 {
  def main(args: Array[String]): Unit = {
    //调下日志,看的不爽
    Logger.getLogger("org").setLevel(ERROR)
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")
    //Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。速度约是Java自带的10 倍。
    //这里必须要指定序列化器,不然将会出现Kafka中的消息无法被序列化的情况
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(10))

    //定义消费的Topic,支持多个,通过返回值中的topic属性区分
    val topics = Array("test")

    //配置Kafka消费者的属性
    val kafkaParams = Map[String,Object] (
      //服务器地址
      "bootstrap.servers" -> "192.168.1.2:9092",
      //Key的反序列化类
      "key.deserializer" -> classOf[StringDeserializer],
      //Value的反序列化类
      "value.deserializer" -> classOf[StringDeserializer],
      //消费者组ID
      "group.id" -> "1",
      //不让Kafka自动提交偏移量,由Spark提交
      "enable.auto.commit" -> (false:java.lang.Boolean)
    )

    //创建DStream
    val stream = KafkaUtils.createDirectStream(
      ssc,                                                 //指定是流上下文
      PreferConsistent,                                    //位置策略(详情看上面的理论)
      Subscribe[String,String](topics,kafkaParams)         //消费者策略(详情看上面的理论)

    )

    //刚刚的消息在value中,所以,我们要拿出value中的值进行下一步操作
    //剩下的,就是熟悉的计数操作
    stream
      .map(_.value)
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
-------------------------------------------
Time: 1666528560000 ms
-------------------------------------------
(aa,2)
(bb,1)
(cc,1)

Flume

这里是特指因项目奇葩需求,需要直接使用Flume Sink到Spark 而不像常规的方法去依赖Kafka

不过官方已经Spark2.3.0弃用了该方法,我做实验用的2.1.1,还是可以再玩玩

让Flume直接Sink到Kafka有两种方式可以实现

准备让Flume去监听服务器端9999端口,然后通过两种sink尝试将数据下沉到Spark

avro

尝试过将多个Flume连接在一起的一定对他不陌生,该Sink是Flume原生自带的

先编写FLume配置文件

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 9999
a1.sources.r1.channels = c1

a1.channels.c1.type = memory

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.159.136
a1.sinks.k1.port = 9998
a1.sinks.k1.channel = c1

导入环境

<!--Spark 连接 flume-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_${scala.version}</artifactId>
    <version>2.4.1</version>
</dependency>

直接使用

val stream = FlumeUtils.createStream(
      ssc,
      "192.168.159.136",        //绑定avro的主机名
      8888                          //端口
    )
      .map(d => new String(d.event.getBody.array()))

stream
  .flatMap(_.split(" "))
  .map((_,1))
  .reduceByKey(_+_)
  .print()

SparkSink

先放着,我还在研究

Flume安装路径下要放入对应依赖

sink的type = org.apache.spark.streaming.flume.sink.SparkSink

自定义输入源

实现一个自定义数据源,需要实现一个Receiver类

直接来看源码比较直观,我把源码精简了下,想要往深处了解的可以直接翻源码,源码建议小白好好看,不然不知道我接下来在干嘛

//传入一个存储级别,这属于RDD持久化的知识点,我有空整理下发
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

  //当Receiver启动时,系统会调用此方法。(需要实现)
  def onStart(): Unit

  //当Receiver停止时,系统调用此方法。在onStart()中设置的所有资源(线程、缓冲区等)必须在此方法中清除。 (需要实现)
  def onStop(): Unit

  // 覆盖它以指定一个首选主机名
  def preferredLocation: Option[String] = None

  // 将接受的数据聚合在一起,然后放入Spark的内存中
  def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }

  //报告接收数据的异常。
  def reportError(message: String, throwable: Throwable) {
    supervisor.reportError(message, throwable)
  }

  //通过调用onStop()和onStart()重启Receiver,关闭与启动的间隔可在Spark配置文件Spark.streaming.receiverrestartdelay中定义
  def restart(message: String) {
    supervisor.restartReceiver(message)
  }

  // 终止Receiver
  def stop(message: String) {
    supervisor.stop(message, None)
  }

  // 返回Receiver是否启动
  def isStarted(): Boolean = {
    supervisor.isReceiverStarted()
  }

  // 返回Receiver是否关闭
  def isStopped(): Boolean = {
    supervisor.isReceiverStopped()
  }

  //返回Receiver的输入流的ID
  def streamId: Int = id

  /** Identifier of the stream this receiver is associated with. */
  private var id: Int = -1

  /** Handler object that runs the receiver. This is instantiated lazily in the worker. */
  @transient private var _supervisor: ReceiverSupervisor = null

  /** Set the ID of the DStream that this receiver is associated with. */
  private[streaming] def setReceiverId(_id: Int) {
    id = _id
  }

  /** Attach Network Receiver executor to this receiver. */
  private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
    assert(_supervisor == null)
    _supervisor = exec
  }

  /** Get the attached supervisor. */
  private[streaming] def supervisor: ReceiverSupervisor = {
    assert(_supervisor != null,
      "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
        "some computation in the receiver before the Receiver.onStart() has been called.")
    _supervisor
  }
}

来,实操,自定义一个Receiver用于监听套接字

我们需要实现其中的onStart()与onStop()方法

import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

object Test01 {
  def main(args: Array[String]): Unit = {
    //调下日志,看的不舒服
    Logger.getLogger("org").setLevel(ERROR)
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")
    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(10))

    val stream = ssc             //直接传入实例化对象
      .receiverStream(new MySocketReceiver("192.168.159.136",9999))

    stream
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

//自定义Receiver
class MySocketReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  //先定义一个方法,用于连接Socket并接收数据
  private def receiver(): Unit ={
    var socket:Socket = null
    var input:String = null
    try {
      socket = new Socket(host,port)
      //读数据
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream,"UTF-8")
      )
      input = reader.readLine()
      while (isStarted && input != null){     //Receiver启动且输入非空
        store(input)               //放入Spark的内存中
        input = reader.readLine()  //下一条
      }
      //释放资源
      reader.close()
      socket.close()
      //重启Receiver,在套接字服务器再次启动时还能继续连接
      restart("Try connect again and restart receiver...")
    }catch {
          //连接异常
      case ex1:java.net.ConnectException => {
        //尝试重新连接
        restart("As error occurred while connecting to [%s:%d] ,Try connect again now...".format(host,port))
      }
        //未知异常,停止Receiver
      case ex02:Exception => {
        stop("Unknown error,receiver stopped...")
      }
    }
  }

  override def onStart(): Unit = {
    //定义一个线程,帮我开启刚刚写的连接器
    new Thread("Socket Receiver"){
      override def run(): Unit = {
        receiver()
      }
    }.start()
  }

  override def onStop(): Unit = {
    Logger.getRootLogger.info("Receiver Stopped Now...")
  }
}
-------------------------------------------
Time: 1666593630000 ms
-------------------------------------------
(bbb,1)
(ddd,1)
(ccc,1)
(aaa,3)

操作

我个人将数据转换、窗口计算,合并至操作中

转换操作

使用起来和RDD算子类似,我不多赘述,

转换

概述

map(func)

通过函数func传递源DStream的每个元素,返回一个新的DStream。

flatMap(func)

类似于map,但是每个输入项都可以映射到0个或多个输出项。

filter(func)

通过func的返回,保留true的记录,返回一个新的DStream。

repartition(numPartitions)

通过创建更多或更少的分区来改变此DStream中的并行级别。

union(otherStream)

返回一个新的DStream,其中包含源DStream和otherDStream中的元素的并集。

count()

通过计算源DStream的每个RDD中的元素数量,返回一个单元素RDD的新DStream。

reduce(func)

通过使用函数func(接受两个参数并返回一个参数)聚合源DStream的每个RDD中的元素,返回一个由单元素RDD组成的新DStream。函数应该是结合律和交换律,这样才能并行计算。

countByValue()

当对类型为K的元素的DStream调用时,返回一个由(K, Long)对组成的新DStream,其中每个键的值是它在源DStream的每个RDD中的频率。

reduceByKey(func, [numTasks])

当在(K, V)对DStream上调用时,返回一个新的(K, V)对DStream,其中每个键的值使用给定的reduce函数聚合。**注意:**默认情况下,这使用Spark的默认并行任务数(本地模式为2,集群模式下由配置属性' Spark .default.parallelism '决定)来进行分组。你可以通过一个可选的numTasks参数来设置不同数量的任务。

join(otherStream, [numTasks])

当调用两个(K, V)和(K, W)对的DStream时,返回一个新的(K, (V, W))对的DStream,其中包含每个键的所有元素对。

cogroup(otherStream, [numTasks])

当调用(K, V)和(K, W)对的DStream时,返回一个新的(K, Seq[V], Seq[W])元组的DStream。

transform(func)

通过对源DStream的每个RDD应用RDD-to-RDD函数,返回一个新的DStream。这可以用于在DStream上执行任意的RDD操作。

updateStateByKey(func)

返回一个带新的“状态”的DStream,其中每个键的状态通过在键的前一个状态和键的新值上应用给定函数来更新。这可用于维护每个键的任意状态数据。

这里面有个特别的算子[updateStateByKey]

其他算子都仅计算当前微批的数据,这类算子被称为无状态算子

updateStateByKey在计算是会保留key的状态,下次计算会根据前一次的key继续转换,这被成为有状态算子

窗口操作

Spark仅支持滑动窗口(滚动是特殊的滑动)

Spark的开窗与Flink不一样,Spark开窗前后的数据类型不变,只是将DStream中的各个窗口的RDD集合成一个新的RDD,Spark的开窗也是直接将开窗与转换的动作合并到一个算子

转换

描述

window(windowLength, slideInterval)

返回一个基于源DStream的窗口批次计算后得到新的DStream。(单纯的RDD合并)

countByWindow(windowLength,slideInterval)

返回基于滑动窗口的DStream中的元素的数量。

reduceByWindow(func, windowLength,slideInterval)

基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。

reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval, [numTasks])

一个更高效的reduceByKeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

由于Spark Streaming天生微批的特性,所以,定义窗口长度(windowLength)与窗口间隔(slideInterval)时,必须是微批时间间隔的倍数

一般我习惯将微批间隔调偏小。可以方便我开窗,也能提升计算的时效性,不过这也不是百利无害,过度缩小微批间隔会导致应用程序的吞吐量下降

实操一下,用窗口每隔5秒统计最经15秒的数据,对数据进行词频统计

import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test01 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(ERROR)
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")
    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(5))    //取最大公因数

    val stream = ssc.socketTextStream("192.168.159.136",9999)

    stream
      .flatMap(_.split(" "))
      .map((_,1))
      .groupByKeyAndWindow(Seconds(15),Seconds(5))    //又ByKey 又ByWindow
      .map(d => (d._1,d._2.sum))        //(aa,ArrayBuffer(1, 1, 1, 1))  => (aa,4)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
-------------------------------------------
Time: 1666596635000 ms
-------------------------------------------
(cc,1)
(bb,1)
(aa,2)

-------------------------------------------
Time: 1666596640000 ms
-------------------------------------------
(cc,1)
(bb,1)
(aa,3)
(dd,1)

-------------------------------------------
Time: 1666596645000 ms
-------------------------------------------
(ff,1)
(cc,1)
(ee,1)
(bb,1)
(aa,6)
(gg,1)
(dd,1)

-------------------------------------------
Time: 1666596650000 ms
-------------------------------------------
(ff,1)
(ee,1)
(aa,4)
(gg,1)
(dd,1)

-------------------------------------------
Time: 1666596655000 ms
-------------------------------------------
(ff,1)
(ee,1)
(aa,3)
(gg,1)

话说,有没有哪个小天才求出我这几秒的输入内容

事件时间窗口

上面演示的是基于处理时间段窗口

在处理事件时间前,先快速了解些概念


三大时间语义

Spark流计算-时间语义.png

例如你在打网络游戏,当你按下技能键时,这个时间就是事件时间,而当这个指令传输到服务器时就是进入时间,服务器处理这个命令的时间就是处理时间,在代码中,往往更加关系事件时间。


官方文档

使用起来也简单,官方解释说因为这种窗口和分组相似,所以我们可以使用Spark SQL API的*groupBy()结合window()*实现

import spark.implicits._

val words = ... // 流数据的Schema { timestamp: Timestamp, word: String }

// 将数据按窗口和单词分组,并计算每组的计数
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

输出操作

类似RDD,DStream也可输出到外部系统

输出选项

描述

print()

在运行流应用程序的驱动节点上打印DStream中每批数据的前十个元素。官方认为这对于开发和调试非常有用。
Python API中不可用

saveAsTextFiles(prefix, [suffix])

将此DStream的内容保存为文本文件

saveAsObjectFiles(prefix, [suffix])

将这个DStream的内容保存为序列化Java对象的SequenceFiles
Python API中不可用

saveAsHadoopFiles(prefix, [suffix])

将这个DStream的内容保存为Hadoop文件。
Python API中不可用

foreachRDD(func)

通用的输出操作,将func用于于DStream中所有的RDD。该功能应将每个RDD中的数据推入外部系统,例如将RDD保存到文件中,或通过网络将其写入数据库。注意,函数func是在运行流应用程序的驱动进程中执行的

前面好几个例子中用到的print()就是一个简单的输出操作

ssc
    .socketTextStream("192.168.159.136",9999)
    .print()

官方认为foreachRDD(func)是一个非常强大的算子,如何正确且有效的使用它非常重要

原句,不是我瞎说

dstream.foreachRDD is a powerful primitive that allows data to be sent out to external systems. However, it is important to understand how to use this primitive correctly and efficiently

所以让我们看一遍官方的案例

通常,向外部系统写入数据通常需要创建一个连接对象(例如,到远程服务器的TCP连接),并使用它向远程系统发送数据。为此,开发人员可能会无意中尝试在Spark driver中创建一个连接对象,然后尝试在Spark worker中使用它来保存rdd中的记录。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // driver段创建连接对象
  rdd.foreach { record =>
    connection.send(record) // worker端使用连接对象保存数据
  }
}

这是不正确的,因为这需要连接对象被序列化,并从driver端发送到worker端。这样的连接对象很少可以跨机器转移。所以可能会出现序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在工作者处初始化)等。正确的解决方案是在工作者处创建连接对象。

然而,这可能会导致另一个常见的错误——为每条记录创建一个新连接,如下

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()   // driver段创建连接对象
    connection.send(record)     // worker端使用连接对象保存数据
    connection.close()
  }
}

创建连接对象需要花费时间和资源。因此,为每个记录创建和销毁一个连接对象可能会产生不必要的高开销,并会显著降低系统的总体吞吐量。更好的解决方案是使用rdd.foreachPartition创建一个连接对象,并使用该连接发送RDD分区中的所有记录。如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

这将在许多记录上分摊创建连接的开销。

最后,可以通过跨多个rdd/批重用连接对象来进一步优化。可以维护一个静态的连接对象池,当多个批的rdd被推到外部系统时,可以重用该连接对象池,从而进一步减少开销。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一个静态的、惰性初始化的连接池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中以便将来重用
  }
}

注意,池中的连接应该按需惰性创建,如果一段时间不使用,则会超时。这实现了将数据最有效地发送到外部系统。

DStream 持久化

DStream与RDD一样,也允许开发人员将其数据(里面的所有RDD)持久化

方法也很简单,只需要使用persist()方法就行

val stream = ssc.socketTextStream("master",9999)
  .persist(
    StorageLevel.MEMORY_ONLY    //我们可以在此定义持久化的存储级别,默认将数据持久化到内存
  )

在涉及开窗以及有状态操作后,Spark Streaming会自动执行persist

DStream 检测点

Spark Streaming应用程序通常7*24小时持续运行,一下非应用程序开发逻辑导致的错误(例如系统故障,JVM故障)不应该影响应用程序的运行

这时检查点机制就很重要,就先游戏中的检查点,它会记录当前的状态,保存到容错系统中,以便随时恢复某一时间段状态

  • 元数据检查点

​ 将定义流计算的信息保存到HDFS等容错存储中。这用于从运行流应用程序驱动程序的节点的故障中恢复(稍后将详细讨论)。元数据包括:

​ - 应用程序配置 ——> 用于创建流应用程序的配置。

​ - DStream操作 ——> 定义流应用程序的DStream操作集。

​ - 未完成批次 ——> 在排队中但尚未完成的作业批次。

  • 数据检查点

将生成的RDD保存到可靠的存储中。

这在一些跨多个批组合数据的有状态转换中是必要的。在这样的转换中,生成的RDD依赖于前一批的RDD,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的无限增加,有状态转换的中间RDD会定期设置检查点到可靠的存储(如HDFS),以切断依赖链。


在以下场景必须要启用检查点:

  • 在计算过程中有涉及有状态转换:

    • 如果在应用程序中使用了updateStateByKey或reduceByKeyAndWindow(有状态算子),那么必须提供检查点目录以允许定期的RDD检查点。

    • 从运行应用程序的Driver故障中恢复,使用元数据检查点来海恢复进度信息。

若你的应用程序未涉及有状态计算,可以允许一些被接受但未被处理的数据丢失,那可以不用配置检查点

如何配置检查点

我们可以通过*streamingContext.checkpoint(一个路径:String)*启用检查点,通常来说,检查点放在一个容错、可靠的文件系统(如HDFS)中

用官方案例改改

import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test01 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(ERROR)

    //拿到SparkStreamingContext
    val ssc = StreamingContext
      .getOrCreate(
        this.checkpointPath,
        createContext _         //比较坑,只能吃() => StreamingContext
      )           //从创建函数获取

    val stream = ssc.socketTextStream("master",9999)
    stream
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }

  //检查点路径
  val checkpointPath = "hdfs://master:9999/sparkCheckpoint"

  //定义个函数,用于从检查点获取SparkStreamingContext
  def createContext(): StreamingContext = {
    //创建配置对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("Test01")

    //创建Spark Streaming Context
    val ssc = new StreamingContext(conf,Seconds(5))

    //通过流上下文设置检查点,将其储存在HDFS
    ssc.checkpoint(this.checkpointPath)

    ssc
  }
}

  • 0