Spark 流计算
编辑什么是SparkStreaming
用于快速上手,有很多细节,后面有时间单独出
Spark Streaming 是Spark提供的一个流计算
框架 点击跳转官方文档
与Flink类似,Spark Streaming也可从Kafka。Flume,TCP套接字等众多途径获取数据,也有map(),reduce(),windows()等等一系列的算子
**
Spark Streaming是基于Spark RDD Api(Spark Core Api)的扩展,最终的执行者还是Spark Core API,所以,与Flink不一样,Spark的流计算其实是使用微批实现的,所以在流计算过程可以使用RDD近乎相同的代码编写
具体在工作时,Spark Streaming会将收到的原数据以秒
为单位切片成多个RDD序列,该序列被成为DStream
(Discretized Stream | 离散流),DStream是Spark Streaming的一个高级抽象,用于表示类似Flink DataStream的哪种连续不断
的数据,对一个DStream进行相应的算子计算,相当于对里面所有的RDD
分别进行算子计算
与Flink一样,每个Application的执行是一个长期持久化
的运行任务,所以,每个应用程序都会被分配并长期占用
一个集群的核心(本地运行则为线程),所以,在运行较大项目时,请确保集群资源足够
小试牛刀
如果你是初学者,先快速跟着我体验下Spark流计算
首先我们要对POM文件添加相应的依赖
<!--spark streaming 流式计算库-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
简单颜色下,使用Spark去监听9999端口的套接字数据,并做词频统计
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Test01 {
def main(args: Array[String]): Unit = {
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(10))
//你要在创建上下文是显性的告诉Spark DStream的数据时按几秒分割的,这我后面会讲
val stream:ReceiverInputDStream[String] = ssc.socketTextStream("127.0.0.1",9999)
stream
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
//执行
ssc.start() //开始计算
ssc.awaitTermination() //等待计算结果出来
}
}
使用netcat,创建个服务端的套接字接口,随后发两条数据过去
[root@master ~]# nc -lk 9999
aaa bbb
aaa ccc
运行结果如下:
-------------------------------------------
Time: 1666512480000 ms
-------------------------------------------
(bbb,1)
(ccc,1)
(aaa,2)
可以看到,在设置了微批间隔为10秒后,相单于每10秒开一个滚动窗口,不过严格来说不能叫窗口,这仅是各个微批的批大小罢了
当然,为了好理解,前期直接将其看成时间窗口就行
输入
基本上每个DStream 输入都会创建一个Receiver用于持久化监听数据(除了来自文件的流,其他都要),每个Receiver会独占
一个集群核心(本地运行则是线程)
文件流
文件流不仅仅可以是本地文件系统,也可是分布式文件系统,例如HDFS
textFileStream会去检查文件夹内新增
的内容
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Test01 {
def main(args: Array[String]): Unit = {
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(10))
val stream:DStream[String] = ssc.textFileStream("file:///D:/SparkProject/word/")
stream
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
//执行
ssc.start()
ssc.awaitTermination()
}
}
在word文件夹创建一个文件 内容如下
aaa bbb
-------------------------------------------
Time: 1666514150000 ms
-------------------------------------------
(aaa,1)
(bbb,1)
套接字
监听本地9999端口,上面小试牛刀演示过了
val stream:ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)
RDD队列
queueStream会监听RDD序列中新增内容
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object Test01 {
def main(args: Array[String]): Unit = {
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(10)) //你要在创建上下文是显性的告诉Spark DStream的数据时按几秒分割的
val sc = ssc.sparkContext
val myRDDs = new mutable.Queue[RDD[Int]]()
val stream = ssc.queueStream(myRDDs)
stream
.map((1,_))
.reduce((a,b) => (a._1 + b._1 , a._2 + b._2)) //(count,sum)
.map(d => "数据条数%d;和为%d".format(d._1,d._2))
.print()
//执行
ssc.start() //开始计算
// ssc.awaitTermination() //不等,继续执行
for(i <- 1 to 30){ //循环30次
myRDDs += sc.makeRDD(Array(i,i+1,i+2)) //向序列中新增RDD
Thread.sleep(1000) //间隔1秒,全部跑完要30秒
}
ssc.stop()
}
}
-------------------------------------------
Time: 1666515630000 ms
-------------------------------------------
数据条数3;和为6
-------------------------------------------
Time: 1666515640000 ms
-------------------------------------------
数据条数3;和为9
-------------------------------------------
Time: 1666515650000 ms
-------------------------------------------
数据条数3;和为12
Kafka
这个实战中比较常见,多讲几句
Spark提供了两套Kafka整合方案,spark-streaming-kafka-0-8和spark-streaming-kafka-0-10,前者已经在Spark2.3.0后弃用,所以下面我仅针对0-10
所以我们要在POM文件添加这个依赖
<!--spark 连接kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>2.4.0</version>
</dependency>
我们可以通过调用KafkaUtils
对象的createDirectStream
方法来创建输入流
createDirectStream有三个参数
ssc 就是当前的流环境上下文
locationStrategy
位置策略
,Saprk Streaming提供了三种,具体还在研究,分别是PreferConsistent 它将在所有的 Executors 上均匀分配分区;
PreferBrokers 当 Spark 的 Executor 与 Kafka Broker 在同一机器上时可以选择该选项,它优先将该 Broker 上的首领分区分配给该机器上的 Executor;
PreferFixed 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机
consumerStrategy
消费者策略
,分为如下两种Subscribe[K,V] 根据主题集合订阅
Subscribe有两个参数
topic 传入一个字符串集合 将订阅集合内所有的Topic
kafkaParams Map[String,Object] 用于定义Kafka消费者的参数
SubscribePattern[K,V] 根据正则订阅
SubscribePattern也有两个参数
pattern java.util.regex.Pattern 正则表达式
kafkaParams Map[String,Object] 用于定义Kafka消费者的参数
监听到的流中的每个数据都是一个ConsumerRecord[K, V]
的实例,其中包含如下内容,通常我们只需要拿到值就行,如下:
ConsumerRecord(
topic = test, /*主题*/
partition = 0, /*分区号*/
offset = 15, /*偏移量*/
CreateTime = 1666527833877, /*消息创建的时间戳*/
serialized key size = -1, /*键序列化器长度*/
serialized value size = 5, /*值序列化器长度*/
headers = RecordHeaders(headers = [], isReadOnly = false), /*头部数据*/
key = null, /*键*/
value = aaaaa /*值*/
)
来,我们简单实操一下
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Test01 {
def main(args: Array[String]): Unit = {
//调下日志,看的不爽
Logger.getLogger("org").setLevel(ERROR)
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。速度约是Java自带的10 倍。
//这里必须要指定序列化器,不然将会出现Kafka中的消息无法被序列化的情况
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(10))
//定义消费的Topic,支持多个,通过返回值中的topic属性区分
val topics = Array("test")
//配置Kafka消费者的属性
val kafkaParams = Map[String,Object] (
//服务器地址
"bootstrap.servers" -> "192.168.1.2:9092",
//Key的反序列化类
"key.deserializer" -> classOf[StringDeserializer],
//Value的反序列化类
"value.deserializer" -> classOf[StringDeserializer],
//消费者组ID
"group.id" -> "1",
//不让Kafka自动提交偏移量,由Spark提交
"enable.auto.commit" -> (false:java.lang.Boolean)
)
//创建DStream
val stream = KafkaUtils.createDirectStream(
ssc, //指定是流上下文
PreferConsistent, //位置策略(详情看上面的理论)
Subscribe[String,String](topics,kafkaParams) //消费者策略(详情看上面的理论)
)
//刚刚的消息在value中,所以,我们要拿出value中的值进行下一步操作
//剩下的,就是熟悉的计数操作
stream
.map(_.value)
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
ssc.awaitTermination()
}
}
-------------------------------------------
Time: 1666528560000 ms
-------------------------------------------
(aa,2)
(bb,1)
(cc,1)
Flume
这里是特指因项目奇葩需求,需要直接使用Flume Sink到Spark 而不像常规的方法去依赖Kafka
不过官方已经Spark2.3.0弃用了该方法
,我做实验用的2.1.1,还是可以再玩玩
让Flume直接Sink到Kafka有两种方式可以实现
准备让Flume去监听服务器端9999端口,然后通过两种sink尝试将数据下沉到Spark
avro
尝试过将多个Flume连接在一起的一定对他不陌生,该Sink是Flume原生自带的
先编写FLume配置文件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 9999
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.159.136
a1.sinks.k1.port = 9998
a1.sinks.k1.channel = c1
导入环境
<!--Spark 连接 flume-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.version}</artifactId>
<version>2.4.1</version>
</dependency>
直接使用
val stream = FlumeUtils.createStream(
ssc,
"192.168.159.136", //绑定avro的主机名
8888 //端口
)
.map(d => new String(d.event.getBody.array()))
stream
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
SparkSink
先放着,我还在研究
Flume安装路径下要放入对应依赖
sink的type = org.apache.spark.streaming.flume.sink.SparkSink
自定义输入源
实现一个自定义数据源,需要实现一个Receiver类
直接来看源码比较直观,我把源码精简了下,想要往深处了解的可以直接翻源码,源码建议小白好好看,不然不知道我接下来在干嘛
//传入一个存储级别,这属于RDD持久化的知识点,我有空整理下发
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
//当Receiver启动时,系统会调用此方法。(需要实现)
def onStart(): Unit
//当Receiver停止时,系统调用此方法。在onStart()中设置的所有资源(线程、缓冲区等)必须在此方法中清除。 (需要实现)
def onStop(): Unit
// 覆盖它以指定一个首选主机名
def preferredLocation: Option[String] = None
// 将接受的数据聚合在一起,然后放入Spark的内存中
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
//报告接收数据的异常。
def reportError(message: String, throwable: Throwable) {
supervisor.reportError(message, throwable)
}
//通过调用onStop()和onStart()重启Receiver,关闭与启动的间隔可在Spark配置文件Spark.streaming.receiverrestartdelay中定义
def restart(message: String) {
supervisor.restartReceiver(message)
}
// 终止Receiver
def stop(message: String) {
supervisor.stop(message, None)
}
// 返回Receiver是否启动
def isStarted(): Boolean = {
supervisor.isReceiverStarted()
}
// 返回Receiver是否关闭
def isStopped(): Boolean = {
supervisor.isReceiverStopped()
}
//返回Receiver的输入流的ID
def streamId: Int = id
/** Identifier of the stream this receiver is associated with. */
private var id: Int = -1
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
@transient private var _supervisor: ReceiverSupervisor = null
/** Set the ID of the DStream that this receiver is associated with. */
private[streaming] def setReceiverId(_id: Int) {
id = _id
}
/** Attach Network Receiver executor to this receiver. */
private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
assert(_supervisor == null)
_supervisor = exec
}
/** Get the attached supervisor. */
private[streaming] def supervisor: ReceiverSupervisor = {
assert(_supervisor != null,
"A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
"some computation in the receiver before the Receiver.onStart() has been called.")
_supervisor
}
}
来,实操,自定义一个Receiver用于监听套接字
我们需要实现其中的onStart()与onStop()方法
import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
object Test01 {
def main(args: Array[String]): Unit = {
//调下日志,看的不舒服
Logger.getLogger("org").setLevel(ERROR)
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(10))
val stream = ssc //直接传入实例化对象
.receiverStream(new MySocketReceiver("192.168.159.136",9999))
stream
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
ssc.awaitTermination()
}
}
//自定义Receiver
class MySocketReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
//先定义一个方法,用于连接Socket并接收数据
private def receiver(): Unit ={
var socket:Socket = null
var input:String = null
try {
socket = new Socket(host,port)
//读数据
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream,"UTF-8")
)
input = reader.readLine()
while (isStarted && input != null){ //Receiver启动且输入非空
store(input) //放入Spark的内存中
input = reader.readLine() //下一条
}
//释放资源
reader.close()
socket.close()
//重启Receiver,在套接字服务器再次启动时还能继续连接
restart("Try connect again and restart receiver...")
}catch {
//连接异常
case ex1:java.net.ConnectException => {
//尝试重新连接
restart("As error occurred while connecting to [%s:%d] ,Try connect again now...".format(host,port))
}
//未知异常,停止Receiver
case ex02:Exception => {
stop("Unknown error,receiver stopped...")
}
}
}
override def onStart(): Unit = {
//定义一个线程,帮我开启刚刚写的连接器
new Thread("Socket Receiver"){
override def run(): Unit = {
receiver()
}
}.start()
}
override def onStop(): Unit = {
Logger.getRootLogger.info("Receiver Stopped Now...")
}
}
-------------------------------------------
Time: 1666593630000 ms
-------------------------------------------
(bbb,1)
(ddd,1)
(ccc,1)
(aaa,3)
操作
我个人将数据转换、窗口计算,合并至操作中
转换操作
使用起来和RDD算子类似,我不多赘述,
这里面有个特别的算子[updateStateByKey]
其他算子都仅计算当前微批的数据,这类算子被称为
无状态算子
updateStateByKey在计算是会保留key的状态,下次计算会根据前一次的key继续转换,这被成为
有状态算子
窗口操作
Spark仅支持滑动窗口(滚动是特殊的滑动)
Spark的开窗与Flink不一样,Spark开窗前后的数据类型不变,只是将DStream中的各个窗口的RDD集合成一个新的RDD,Spark的开窗也是直接将开窗与转换的动作合并到一个算子
由于Spark Streaming天生微批的特性,所以,定义窗口长度(windowLength)与窗口间隔(slideInterval)时,
必须是微批时间间隔的倍数
一般我习惯将微批间隔调偏小。可以方便我开窗,也能提升计算的时效性,不过这也不是百利无害,过度缩小微批间隔会导致应用程序的吞吐量下降
实操一下,用窗口每隔5秒统计最经15秒的数据,对数据进行词频统计
import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Test01 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(ERROR)
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(5)) //取最大公因数
val stream = ssc.socketTextStream("192.168.159.136",9999)
stream
.flatMap(_.split(" "))
.map((_,1))
.groupByKeyAndWindow(Seconds(15),Seconds(5)) //又ByKey 又ByWindow
.map(d => (d._1,d._2.sum)) //(aa,ArrayBuffer(1, 1, 1, 1)) => (aa,4)
.print()
ssc.start()
ssc.awaitTermination()
}
}
-------------------------------------------
Time: 1666596635000 ms
-------------------------------------------
(cc,1)
(bb,1)
(aa,2)
-------------------------------------------
Time: 1666596640000 ms
-------------------------------------------
(cc,1)
(bb,1)
(aa,3)
(dd,1)
-------------------------------------------
Time: 1666596645000 ms
-------------------------------------------
(ff,1)
(cc,1)
(ee,1)
(bb,1)
(aa,6)
(gg,1)
(dd,1)
-------------------------------------------
Time: 1666596650000 ms
-------------------------------------------
(ff,1)
(ee,1)
(aa,4)
(gg,1)
(dd,1)
-------------------------------------------
Time: 1666596655000 ms
-------------------------------------------
(ff,1)
(ee,1)
(aa,3)
(gg,1)
话说,有没有哪个小天才求出我这几秒的输入内容
事件时间窗口
上面演示的是基于处理时间段窗口
在处理事件时间前,先快速了解些概念
三大时间语义
例如你在打网络游戏,当你按下技能键时,这个时间就是事件时间,而当这个指令传输到服务器时就是进入时间,服务器处理这个命令的时间就是处理时间,在代码中,往往更加关系事件时间。
使用起来也简单,官方解释说因为这种窗口和分组相似,所以我们可以使用Spark SQL API的*groupBy()结合window()*实现
import spark.implicits._
val words = ... // 流数据的Schema { timestamp: Timestamp, word: String }
// 将数据按窗口和单词分组,并计算每组的计数
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
输出操作
类似RDD,DStream也可输出到外部系统
前面好几个例子中用到的print()就是一个简单的输出操作
ssc
.socketTextStream("192.168.159.136",9999)
.print()
官方认为foreachRDD(func)是一个非常强大的算子,如何正确且有效的使用它非常重要
原句,不是我瞎说
dstream.foreachRDD
is a powerful primitive that allows data to be sent out to external systems. However, it is important to understand how to use this primitive correctly and efficiently
所以让我们看一遍官方的案例
通常,向外部系统写入数据通常需要创建一个连接对象(例如,到远程服务器的TCP连接),并使用它向远程系统发送数据。为此,开发人员可能会无意中尝试在Spark driver中创建一个连接对象,然后尝试在Spark worker中使用它来保存rdd中的记录。
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // driver段创建连接对象
rdd.foreach { record =>
connection.send(record) // worker端使用连接对象保存数据
}
}
这是不正确的,因为这需要连接对象被序列化,并从driver端发送到worker端。这样的连接对象很少可以跨机器转移。所以可能会出现序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在工作者处初始化)等。正确的解决方案是在工作者处创建连接对象。
然而,这可能会导致另一个常见的错误——为每条记录创建一个新连接,如下
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection() // driver段创建连接对象
connection.send(record) // worker端使用连接对象保存数据
connection.close()
}
}
创建连接对象需要花费时间和资源。因此,为每个记录创建和销毁一个连接对象可能会产生不必要的高开销,并会显著降低系统的总体吞吐量。更好的解决方案是使用rdd.foreachPartition创建一个连接对象,并使用该连接发送RDD分区中的所有记录。如下:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
这将在许多记录上分摊创建连接的开销。
最后,可以通过跨多个rdd/批重用连接对象来进一步优化。可以维护一个静态的连接对象池,当多个批的rdd被推到外部系统时,可以重用该连接对象池,从而进一步减少开销。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool是一个静态的、惰性初始化的连接池
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 返回到池中以便将来重用
}
}
注意,池中的连接应该按需惰性创建,如果一段时间不使用,则会超时。这实现了将数据最有效地发送到外部系统。
DStream 持久化
DStream与RDD一样,也允许开发人员将其数据(里面的所有RDD)持久化
方法也很简单,只需要使用persist()方法就行
val stream = ssc.socketTextStream("master",9999)
.persist(
StorageLevel.MEMORY_ONLY //我们可以在此定义持久化的存储级别,默认将数据持久化到内存
)
在涉及开窗
以及有状态
操作后,Spark Streaming会自动执行
persist
DStream 检测点
Spark Streaming应用程序通常7*24小时持续运行,一下非应用程序开发逻辑导致的错误(例如系统故障,JVM故障)不应该影响应用程序的运行
这时检查点机制就很重要,就先游戏中的检查点,它会记录当前的状态,保存到容错系统中,以便随时恢复某一时间段状态
元数据检查点
将定义流计算的信息保存到HDFS等容错存储中。这用于从运行流应用程序驱动程序的节点的故障中恢复(稍后将详细讨论)。元数据包括:
- 应用程序配置 ——> 用于创建流应用程序的配置。
- DStream操作 ——> 定义流应用程序的DStream操作集。
- 未完成批次 ——> 在排队中但尚未完成的作业批次。
数据检查点
将生成的RDD保存到可靠的存储中。
这在一些跨多个批组合数据的有状态转换中是必要的。在这样的转换中,生成的RDD依赖于前一批的RDD,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的无限增加,有状态转换的中间RDD会定期设置检查点到可靠的存储(如HDFS),以切断依赖链。
在以下场景必须要启用检查点:
在计算过程中有涉及有状态转换:
如果在应用程序中使用了updateStateByKey或reduceByKeyAndWindow(有状态算子),那么必须提供检查点目录以允许定期的RDD检查点。
从运行应用程序的Driver故障中恢复,使用元数据检查点来海恢复进度信息。
若你的应用程序未涉及有状态计算,可以允许一些被接受但未被处理的数据丢失,那可以不用配置检查点
如何配置检查点
我们可以通过*streamingContext.checkpoint(一个路径:String)*启用检查点,通常来说,检查点放在一个容错、可靠的文件系统(如HDFS)中
用官方案例改改
import org.apache.log4j.Level._
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Test01 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(ERROR)
//拿到SparkStreamingContext
val ssc = StreamingContext
.getOrCreate(
this.checkpointPath,
createContext _ //比较坑,只能吃() => StreamingContext
) //从创建函数获取
val stream = ssc.socketTextStream("master",9999)
stream
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
ssc.awaitTermination()
}
//检查点路径
val checkpointPath = "hdfs://master:9999/sparkCheckpoint"
//定义个函数,用于从检查点获取SparkStreamingContext
def createContext(): StreamingContext = {
//创建配置对象
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Test01")
//创建Spark Streaming Context
val ssc = new StreamingContext(conf,Seconds(5))
//通过流上下文设置检查点,将其储存在HDFS
ssc.checkpoint(this.checkpointPath)
ssc
}
}
- 0
-
赞助
微信支付宝 -
分享