残月的技术日志

残月的技术日志

Flink 数据分流与合流

2023-06-18

数据分流

流数据就像车流,每辆车可不一定都去往同一个目的地,所以,现实中我们可能要对数据进行分流

分流后,数据被分成了主流(我这么叫,不知道有没有错)与侧边流,常常提到的Flink侧边流大致就是这个概念

大概长这样

通常我们使用一定的条件限制侧边流的数据,不然一模一样也没意义是吧

举个例子

我将模拟一个班级学生的成绩,将不及格(<60分)的同学的数据通过侧边流输出,其余同学则照常使用主流输出

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

//个人比较喜欢使用自定义的类型
case class Score(name:String,Score:Float)
object test{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromElements(
      ("李二狗",86.5f),
      ("张三猫",95.0f),
      ("刘四牛",55.5f)
    ).map(d => {Score(d._1, d._2)})

    //在分流前,我们要为每个侧边流定义一个outputTag,不让它不知道从哪出去
    val noPassTag = new OutputTag[Score]("noPass")

    /** 然后开始正式的分流
      * 使用process方法实现分流
      * 传入org.apache.flink.streaming.api.functions.ProcessFunction的实例
      * 第一个是输入的数据类型,第二个是输出的数据类型
      * */
    val scoreStream = dataStream
      .process(new ProcessFunction[Score,Score] {
        override def processElement(value: Score, ctx: ProcessFunction[Score, Score]#Context, out: Collector[Score]): Unit = {
          //使用if判断条件
          if(value.Score < 60){              //不及格的走这,留下来叫家长。。。。
            ctx.output(noPassTag,value)      //刚刚的标签派上用场了
          }else {
            out.collect(value)
          }
        }
      })

    //使用侧边流的数据
    scoreStream.getSideOutput(noPassTag)
      .print("不及格")

    //主流输出
    scoreStream
      .print("及格")

    env.execute()
  }
}

运行结果:

及格> Score(李二狗,86.5)
及格> Score(张三猫,95.0)
不及格> Score(刘四牛,55.5)

数据合流

先讲一句:Flink中所有的数据合流操作,都无法保证数据的顺序

就像车流,有分就有合

数据的合流分为两种,如下:

Union(聚合) - 简单粗暴

直接交汇多个流,需要求两个流数据类型相同

数据不会去重,若与自己聚合,每个元素将出现两次

举个例子

import org.apache.flink.streaming.api.scala._

object test{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream1 = env.fromCollection(List(1,2,3,4,5))
    val stream2 = env.fromCollection(List(6,7,8,9,0))

    stream1
      .union(stream2)     //dataStream:DataStream
      .print()

    env.execute()
  }
}

运行结果:

1
2
3
4
5
6
7
8
9
0

Connect(连接) - 灵活

Connect可以连接两个数据类型不同的数据流

但要注意:

  • 返回的将不再是DataStream,而是ConnectedStream

  • ConnectedStream就只能使用map、flatMap,process,keyBy算子

  • Connect只能连接两个流,而union可以是多个流

举个例子

import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala._

object test{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream1 = env.fromCollection(List(1,2,3,4,5))
    val stream2 = env.fromCollection(List("6","7","8"))

    stream1
      .connect(stream2)
      .map(new MyCoMapFunction)     //注意,这里的Map不能像之前一样直接使用lambda函数了
    .print()

    env.execute()
  }
}

//我们实现一个 CoMapFunction
/*
* org.apache.flink.streaming.api.functions.co.CoMapFunction
* IN1 第一个流的输入(也就是被连接的那个)
* IN2 第二个流的输入(也就是用来连接的那个)
* OUT CoMapFunction实例的输出
* */
class MyCoMapFunction extends CoMapFunction[Int,String,Int] {
  //处理第一个流的数据
  override def map1(value: Int): Int = value
  //处理第二个流的数据
  override def map2(value: String): Int = value.toInt
}

运行结果:

1
6
2
7
3
8
4
5

  • 0