残月的技术日志

残月的技术日志

Flink 状态管理_StateAPI

2023-06-18

Flink的状态

何为状态(state):可以理解成在计算的过程中产生的数据、变量等等等等,例如我们做词频计数时,单词的数量就是状态,每当对已经统计的数量进行更新时,这个更新就是状态的更新

Flink 天生可以对数据进行状态计算,且提供了几种由Flink托管的状态

Flink应用程序的状态通常是在本地进行,这么做为了提高吞吐量以及降低延迟,Flink应用程序一般都将状态存储在JVM堆内存中。

通过状态快照,Flink可以提供可容错的、可精确到一次的计算语义。

Flink会在程序执行时获取并保存分布式流处理管道(Pipeline)中的所有的状态以及整个作业图中 算子获取到数据时的状态。

Flink提供了不同的状态机制,用于指定状态的存储方式和存储位置,按照是否由Flink进行管理分为以下两类

托管状态(Managed State)

原生状态(Raw State)

管理方式

由 Flink Runtime 自动管理、会自动存储与恢复状态、Flink对齐进行了优化

由用户自行管理、需要用户自行序列化

数据结构

ValueState、ListState、MapState ........

Byte[] 自己实现

使用场景

绝大多数用它就够

左边那位用不了的时候才考虑

Maneged State

Managed State由Flink Runtime管理,自动存储,自动恢复,在内存上有做优化

根据数据集是否要按照Key进行分区,由将状态分为 Keyed State 和 Operator State(也有人叫Non-Keyed State,一回事)

Keyed State

Operator State

适用算子

只适用于KeyesStream上的算子

可用于所有算子

状态分配

每个Key对应一个状态

一个算子的一个子任务对应一个状态

创建和扩展方式

重写Rich Function,通过里面的RuntimeContext去访问

实现ChckopintedFunction等结构接口

横向扩展

状态将随着Key在多个算子之间迁移

状态的分配方式多样

支持的数据结构

ValueState、ListState、MapState.......

ListState、BroadcastState.....

(表格来自知乎@PP鲁)

KeyedState

Keyed State 区分不同的Key的数据进行状态存储和管理,每个Key对应一个状态对象

通过keyBy()算子会将数据流进行状态分区,Keyed State被进一步组织成Keyed Groups

一个Keyed Groups包含多个Key的状态,是用于重新分配Keyed State的原子单位`

支持的数据类型

  • ValueState:保存一个具体的,可以对其进行更新查询,该值与数据数据的Key对应,可以使用update(T)方法更新值,value()方法获取值,clear()清除值

  • ListState:保存一个数据列表,通过add(T)或addAll(List)向列表添加数据;可以使用gat()方法得到一个可迭代的(Iterable),可用于遍历列表;可使用update(List)覆盖更新列表

  • ReducingState:保存一个聚合的单值,可以通过add(T)添加元素,但添加元素时,做的操作则是将该元素丢给用户自定义的ReduceFunction,然后将ReduseFunction的返回结果用于更新状态报错的单值,所以与前者不同的是,前者保存的是一个集合,而ReducingState保存的是一聚合结果

  • AggregatingState<IN,OUT>:保存一个聚合的单值,与前者不同,它允许状态的聚合结果与添加到状态的元素不同,同样适用add添加,原理与前者一样。

  • MapState<UK,UV>:保存为一个键值对数据,可以使用put(UK,UV)/ putAll(Map<UK,UV>)方法添加/更新数据到状态,也可使用gey(UK)获取相应的数据

上面几个数据类型均可使用clear()方法清除状态

以ValueState举例

相同Key达到两次,求出其平均值,抛给下游并清空状态

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object test{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromElements(
      (1,3),
      (2,5),
      (1,7),
      (2,3),
      (1,2)     //最后这条数据由于不足2 ,不会被计数
    )

    dataStream
      .keyBy(_._1)
      .flatMap(new MyFlatMapFunction())
      .print()

    env.execute()
  }
}
//自定义一个FlatMap
class MyFlatMapFunction extends RichFlatMapFunction[(Int,Int),(Int,Int)] {
  //声明个变量,用于存放状态
  private var sum:ValueState[(Int,Int)] = _
  override def open(parameters: Configuration): Unit = {
    //从上下文拿到状态
    this.sum = getRuntimeContext.getState(    //getRuntimeContext可以得到该函数实例的上下文信息
      //ValueState 使用ValueStateDescriptor 创建
      new ValueStateDescriptor[(Int,Int)]("score",createTypeInformation[(Int,Int)])
    )
  }

  override def flatMap(in: (Int,Int), collector: Collector[(Int,Int)]): Unit = {
    //先通过value()方法得到状态值
    val stateValue= sum.value()
    //初始化,如果状态值为空,就创建一个初始值,否者使用状态值
    val currentSum = if (stateValue != null) stateValue else (0,0)
    //(计数,求和)
    val newSum = (currentSum._1 + 1 , currentSum._2 + in._2)
    //使用update()方法更新状态
    this.sum.update(newSum)
    //如果计数器达到2,将平均值丢出去并清空状态
    if(newSum._1 >= 2){
      collector.collect((in._1 , newSum._2 / newSum._1))
      this.sum.clear()
    }
  }
}

运行结果:

(1,5)
(2,4)

再以MapState举例

使用MapState保存中间结果,计算分组的和

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object test{
  def main(args: Array[String]):Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromElements(
      ("Java",1),
      ("Python",3),
      ("Java",2),
      ("Scala",2),
      ("Python",1),
      ("Java",1),
      ("Scala",2)
    )

    dataStream
      .keyBy(_._1)        //keyBy分组
      //MapState还是要从上下文获取,所以还需要实现一个富函数
      .flatMap(new MyFlatMapFunc)
      .print("求和")

    env.execute()
  }
}

class MyFlatMapFunc extends RichFlatMapFunction[(String,Int),(String,Int)] {
  private var state:MapState[String, Int] = _

  //通过生命周期函数获取MapState
  override def open(parameters: Configuration): Unit = {
    //创建描述器
    val stateDesc = new MapStateDescriptor[String,Int](
      "sumMap",
      classOf[String],
      classOf[Int]
    )
    //传入描述器得到状态
    this.state = getRuntimeContext.getMapState(stateDesc)
  }

  //使用MapState获取历史结果
  override def flatMap(in: (String, Int), collector: Collector[(String, Int)]): Unit = {
    //获取新数据的key
    val key:String = in._1
    //通过get(UK)去State中拿历史结果
    val stateValue: Int = this.state.get(key)
    //通过put(UK,UV)添加/更新数据到
    this.state.put(key,stateValue + in._2)

    collector.collect((key,this.state.get(key)))
  }
}

运行结果:

求和> (Java,1)
求和> (Python,3)
求和> (Java,3)
求和> (Scala,2)
求和> (Python,4)
求和> (Java,4)
求和> (Scala,4)

Operator State

与Keyed State不同,Operator State不局限于Keyed算子,所有流入算子的数据都可访问和更新该状态

Flink应用程序的状态的问都是在本地进行,为了保证数据的可恢复性,使用CheckPoint机制将状态数据持久化到存储空间上(具体后面写文章讲),主要逻辑主要有两项,一是将算子任务班底内存数据在CheckPoint时写硬盘(称为snapshot),二是在Flink应用初始化或重启时将永久化的数据读取通过一定逻辑处理后变为算子的本地内存数据(称为restore)。在次过程中,Flink并不能保证数据百分百的一致

为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。

支持的数据类型

Operator State支持的数据类型有三种

  • ListState

  • UnionListState

  • BroadcastState

使用

Operator State常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

这里我们来看一个Flink官方提供的Sink案例以了解CheckpointedFunction的工作原理。

// BufferingSink需要继承SinkFunction以实现其Sink功能,同时也要继承CheckpointedFunction接口类
class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

  // Operator List State句柄
  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  // 本地缓存
  private val bufferedElements = ListBuffer[(String, Int)]()

  // Sink的核心处理逻辑,将上游数据value输出到外部系统
  override def invoke(value: (String, Int), context: Context): Unit = {
    // 先将上游数据缓存到本地的缓存
    bufferedElements += value
    // 当本地缓存大小到达阈值时,将本地缓存输出到外部系统
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      // 清空本地缓存
      bufferedElements.clear()
    }
  }

  // 重写CheckpointedFunction中的snapshotState
  // 将本地缓存snapshot保存到存储上
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    // 将之前的Checkpoint清理
    checkpointedState.clear()
    // 将最新的数据写到状态中
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  // 重写CheckpointedFunction中的initializeState
  // 初始化状态
  override def initializeState(context: FunctionInitializationContext): Unit = {
    // 注册ListStateDescriptor
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )

    // 从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    // 如果是作业重启,读取存储中的状态数据并填充到本地缓存中
    if(context.isRestored) {
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }
}

Raw State

所有的Raw State都是Operator State

Raw State这需要用户自己去管理,需要自己序列化,Flink不知道State内的数据是什么结构,用户最终需要把它序列化成可存储的数据结构

后续补上

  • 0