val dataStream = env.fromElements(1,1.1,"hello",true) // val dataStream = env.readTextFile(inputPath)
//4 Kafka // val consumerProperties = new Properties() // consumerProperties.setProperty("bootstrap.servers", "kafka1:9092") // consumerProperties.setProperty("group.id", "flink-stream") // val topic = "flink-source"; // val kafkaSource = new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),consumerProperties) // // val dataStream = env.addSource(kafkaSource)
文件
//3 文件 val inputPath: String = "/path/to/file" val dataStream = env.readTextFile(inputPath)
Kafka【重要】
val consumerProperties = newProperties() consumerProperties.setProperty("bootstrap.servers", "kafka1:9092") consumerProperties.setProperty("group.id", "flink-stream") val topic = "flink-source"; val kafkaSource = newFlinkKafkaConsumer[String](topic,newSimpleStringSchema(),consumerProperties)
val dataStream = env.addSource(kafkaSource)
自定义Source
场景一:测试
场景二:其他的数据源
val dataStream = env.addSource(newSensorReaderSource())
caseclassSensorReader(id: String, timestamp: Long, temperature: Double) //继承SourceFunction,实现run()/cancel() classSensorReaderSource() extendsSourceFunction[SensorReader] { var running: Boolean = true val random = Random
var currentTemp = 1.to(10).map(i => SensorReader("sensor_" + i, System.currentTimeMillis(), random.nextDouble() * 5 + 35)).toList
defgetDataList(): List[SensorReader] = { currentTemp.map( s => SensorReader(s.id,System.currentTimeMillis(),s.temperature+random.nextGaussian()) )
}
overridedefrun(ctx: SourceFunction.SourceContext[SensorReader]): Unit = while (running) { getDataList().foreach( // ctx.collect 用于发送数据 data => ctx.collect(data) ) Thread.sleep(1000) }
//取消 overridedefcancel(): Unit = running = false }