val dataStream = env.addSource(newSensorReaderSource())
caseclassSensorReader(id: String, timestamp: Long, temperature: Double) //继承SourceFunction,实现run()/cancel() classSensorReaderSource() extendsSourceFunction[SensorReader] { var running: Boolean = true val random = Random
var currentTemp = 1.to(10).map(i => SensorReader("sensor_" + i, System.currentTimeMillis(), random.nextDouble() * 5 + 35)).toList
defgetDataList(): List[SensorReader] = { currentTemp.map( s => SensorReader(s.id,System.currentTimeMillis(),s.temperature+random.nextGaussian()) )
}
overridedefrun(ctx: SourceFunction.SourceContext[SensorReader]): Unit = while (running) { getDataList().foreach( // ctx.collect 用于发送数据 data => ctx.collect(data) ) Thread.sleep(1000) }
//取消 overridedefcancel(): Unit = running = false }