
自定义DataSource有两大类:单线程的DataSource和多线程的DataSource
单线程:继承 SourceFunction
多线程:继承 ParallelSourceFunction,继承 RichParallelSourceFunction(可以有其他的很多操作)
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, RichParallelSourceFunction, SourceFunction}
//1. 单线程
class MyNoParallelSource1 extends SourceFunction[Long] {
var count = 1L;
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning) {
ctx.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
//2. 多线程
class MyNoParallelSource2 extends ParallelSourceFunction[Long] {
var count = 1L
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning) {
ctx.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
/**3. 多线程使用RichFunction的方式
* 提供了open和close方法,可以用于打开和释放资源
*/
class MyNoParallelSource3 extends RichParallelSourceFunction[Long] {
var count = 1
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning) {
ctx.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
override def open(parameters: Configuration): Unit = super.open(parameters)
override def close(): Unit = super.close()
}