DataStream → DataStream
Map
对每个成员进行操作
dataStream.map { x => x * 2 }
例如我们将测温枪数据流中的每一行转化为样例类
原始数据
id,时间戳,温度
sensor_4,1609203345495,39.126604003147314 sensor_2,1609203345495,39.42746976779783 sensor_8,1609203345495,38.573523839218375 sensor_6,1609203345495,37.65574402266021 sensor_1,1609203345495,35.97294873181826 sensor_10,1609203345495,35.11395246803552 sensor_5,1609203345495,37.17056374401738 ...
|
样例类
case class SensorReader(id: String, timestamp: Long, temperature: Double)
|
map转化
val dataStream = inputStream .map( data => { val array = data.split(",") SensorReader(array(0), array(1).toLong, array(2).toDouble) } )
|
Filter
过滤
dataStream.filter { _ != 0 }
|
FlatMap
压平
dataStream.flatMap { str => str.split(" ") }
|
DataStream* → DataStream
Union
合并多个流,要求数据格式一致
dataStream.union(otherStream1, otherStream2, ...)
|
val unionStream = highStream.union(normalStream)
|
DataStream → KeyedStream
KeyBy
按照key来分组
val aggStream = dataStream .keyBy(_.id) .minBy("temperature")
|
KeyedStream → DataStream
Aggregations
keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")
|
DataStream,DataStream → ConnectedStreams
Connect
连接流,只能连接两个,但可以数据格式不同
someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)
|
val warnStream = highStream.map(data => (data.id, data.temperature)) val connectedStream = warnStream.connect(normalStream) val coMapResultStream = connectedStream .map( warnData => (warnData._1, warnData._2, "warning"), normalData => (normalData.id, "healthy") )
|
Side Outputs
侧向输出用于分流
假定目前有一批测温枪的体温数据,按照温度分为37°以上的高温和37°以下的正常温度
val highTag = OutputTag[SensorReader]("high-temperature") val normalTag = OutputTag[SensorReader]("normal-temperature")
val allStream = dataStream.process( (value, ctx: ProcessFunction[SensorReader, SensorReader]#Context, out: Collector[SensorReader]) => { out.collect(value) if (value.temperature > 37) { ctx.output(highTag, value) } else { ctx.output(normalTag, value) } } ) val highStream = allStream.getSideOutput(highTag) val normalStream = allStream.getSideOutput(normalTag)
highStream.print("high") normalStream.print("normal") allStream.print("all")
|
参考
Flink Operators
Flink Side Outputs
源码
源码放在了Github上,见flink-scala
本文标题:Flink-DataStream-Transformations
文章作者:Shea
原始链接:https://di1shuai.com/Flink数据转换-Transformation.html
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!