Flink 数据分流与合流
编辑
50
2023-06-18
数据分流
流数据就像车流,每辆车可不一定都去往同一个目的地,所以,现实中我们可能要对数据进行分流
分流后,数据被分成了主流(我这么叫,不知道有没有错)与侧边流,常常提到的Flink侧边流大致就是这个概念
大概长这样
通常我们使用一定的条件限制侧边流的数据,不然一模一样也没意义是吧
举个例子
我将模拟一个班级学生的成绩,将不及格(<60分)的同学的数据通过侧边流输出,其余同学则照常使用主流输出
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
//个人比较喜欢使用自定义的类型
case class Score(name:String,Score:Float)
object test{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromElements(
("李二狗",86.5f),
("张三猫",95.0f),
("刘四牛",55.5f)
).map(d => {Score(d._1, d._2)})
//在分流前,我们要为每个侧边流定义一个outputTag,不让它不知道从哪出去
val noPassTag = new OutputTag[Score]("noPass")
/** 然后开始正式的分流
* 使用process方法实现分流
* 传入org.apache.flink.streaming.api.functions.ProcessFunction的实例
* 第一个是输入的数据类型,第二个是输出的数据类型
* */
val scoreStream = dataStream
.process(new ProcessFunction[Score,Score] {
override def processElement(value: Score, ctx: ProcessFunction[Score, Score]#Context, out: Collector[Score]): Unit = {
//使用if判断条件
if(value.Score < 60){ //不及格的走这,留下来叫家长。。。。
ctx.output(noPassTag,value) //刚刚的标签派上用场了
}else {
out.collect(value)
}
}
})
//使用侧边流的数据
scoreStream.getSideOutput(noPassTag)
.print("不及格")
//主流输出
scoreStream
.print("及格")
env.execute()
}
}
运行结果:
及格> Score(李二狗,86.5)
及格> Score(张三猫,95.0)
不及格> Score(刘四牛,55.5)
数据合流
先讲一句:Flink中所有的数据合流操作,都
无法保证数据的顺序
就像车流,有分就有合
数据的合流分为两种,如下:
Union(聚合) - 简单粗暴
直接交汇多个
流,需要求两个流数据类型相同
数据不会去重
,若与自己聚合,每个元素将出现两次
举个例子
import org.apache.flink.streaming.api.scala._
object test{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream1 = env.fromCollection(List(1,2,3,4,5))
val stream2 = env.fromCollection(List(6,7,8,9,0))
stream1
.union(stream2) //dataStream:DataStream
.print()
env.execute()
}
}
运行结果:
1
2
3
4
5
6
7
8
9
0
Connect(连接) - 灵活
Connect可以连接两个
数据类型不同
的数据流
但要注意:
返回的将不再是DataStream,而是
ConnectedStream
ConnectedStream就
只能使用
map、flatMap,process,keyBy算子Connect
只能连接两个流
,而union可以是多个流
举个例子
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala._
object test{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream1 = env.fromCollection(List(1,2,3,4,5))
val stream2 = env.fromCollection(List("6","7","8"))
stream1
.connect(stream2)
.map(new MyCoMapFunction) //注意,这里的Map不能像之前一样直接使用lambda函数了
.print()
env.execute()
}
}
//我们实现一个 CoMapFunction
/*
* org.apache.flink.streaming.api.functions.co.CoMapFunction
* IN1 第一个流的输入(也就是被连接的那个)
* IN2 第二个流的输入(也就是用来连接的那个)
* OUT CoMapFunction实例的输出
* */
class MyCoMapFunction extends CoMapFunction[Int,String,Int] {
//处理第一个流的数据
override def map1(value: Int): Int = value
//处理第二个流的数据
override def map2(value: String): Int = value.toInt
}
运行结果:
1
6
2
7
3
8
4
5
- 0
-
赞助
微信支付宝 -
分享