• 【Flink读写外部系统】Flink异步访问外部系统_mysql


    1 在什么场景下使用异步访问系统?

      举个例子,你如果在算子中用到了MapFunction,写你想要对map中的每条处理记录都查询下数据库丰富当前处理记录。那么此时就可以使用异步访问系统

    2 异步访问都解决了哪些问题?

      - 延迟问题。每次请求外部数据库都会带来很多的延迟(如果不采用异步的话MapFunction的大部分时间都是在等待查询结果)
      - 而Flink提供的AsyncFunction(异步)可以有效的降低I/O所带来的延迟问题。

    3 异步是如何解决延迟问题的?

      - 官网是这么说的:“AsyncFunction函数能够同时发出多个查询并对其结果进行异步处理”
      - 我们可以抽象的概括为:传统的请求是在map中每循环一次都会发出一次请求并等待返回结果,然后再进行逻辑计算。如下方左图
      - 但是通过异步的话,类似于是将集合中的数据统一进行一次并发请求,然后读取返回的结果,再进行逻辑计算。如下方右图

    • 优点
      – 1. AsyncFunction与检查点进行了良好的集成,即所有正在等待返回结果的记录都会被写入到检查点并支持在恢复时重新发送请求。

      – 2. AsyncFunction也可以在事件事件处理模式下正确地工作,因为即使允许乱序,它也能够保证记录不会被水位线超过。

    4 Flink提供的异步模式

    1. 在Flink中提供的异步IO的模式,不需要使用map函数阻塞式的加载数据,而是使用异步方法同时处理大量请求。
    2. 不过这就需要数据库支持异步请求,如果不支持异步请求也可以手动维护线程池调用,只不过效率上没有原生的异步client更高效。
    3. 比如Mysql可以通过Vertx支持异步查询(但我翻阅技术博客后发现,有的人说Vertx只支持scala版本2.12及以上的,具体没验证过),HBase2.x也支持异步查询。

    注意:

    • 外部系统最好能提供一个支持异步调用的客户端,很多现有系统都可以做到这点。而如果外部系统只提供了同步客户端,我们可以通过多线程的昂是来发送请求并对其进行处理。
      (如果外部数据源是Mysql,一般的jdbc连接都是同步机制的,因此我们可以使用异步的JDBC组件-Vertx)

    5 Flink异步模式读取Mysql操作-代码

    package flink
    
    import java.sql.{Connection, DriverManager, Statement}
    import java.util.concurrent.Executors
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction}
    import org.slf4j.{Logger, LoggerFactory}
    
    import scala.collection.mutable.ListBuffer
    import scala.concurrent.{ExecutionContext, Future}
    import scala.util.{Failure, Success}
    
    class AsyncFunction_ceshi extends RichAsyncFunction[class_infor, (Int, String)] {
      val logger: Logger = LoggerFactory.getLogger(this.getClass)
    
      private lazy val cachingPoolEXECTX = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
      private lazy val direExec = ExecutionContext.fromExecutor(
        org.apache.flink.runtime.concurrent.Executors.directExecutor()
      )
      var pre: Statement = _
      var connect: Connection = _
      //  var result: ResultSet = _
    
      override def open(parameters: Configuration): Unit = {
        Class.forName("com.mysql.jdbc.Driver")
        connect = DriverManager.getConnection("jdbc:mysql://master:3306/friends", "root", "msb_mk")
        pre = connect.createStatement()
      }
    
      override def asyncInvoke(input: class_infor, resultFuture: ResultFuture[(Int, String)]): Unit = {
        val age = input.age
        val name: Future[String] = Future {
          val result = pre.executeQuery(s"select name,age from ceshi where age = $age")
          //      Thread.sleep(2000)
          val name = if (result.next()) result.getString("name") else "UNKNOW"
          name
        }(cachingPoolEXECTX)
        import scala.collection.JavaConverters._
        name.onComplete {
          case Success(value) => resultFuture.complete(ListBuffer((age, value)).asJava)
          //            case Success(value) => resultFuture.complete(Collections.singletonList(age, value))
          case Failure(value) => resultFuture.completeExceptionally(value)
        }(direExec)
      }
    
      override def timeout(input: class_infor, resultFuture: ResultFuture[(Int, String)]): Unit = {
        //在上面的asyncInvoke我自发的让其出现延迟问题;如果出现上述延迟问题则会在本函数中进行处理
        logger.warn("Async function for mysql timeout... ...")
        import scala.collection.JavaConverters._
        resultFuture.complete(ListBuffer((input.age, "超时")).asJava)
      }
    
      override def close(): Unit = {
        try {
          //      result.close()
          pre.close()
          connect.close()
        } catch {
          case e: Exception => e.printStackTrace()
        }
    
      }
    }
    
    
    package flink
    import java.util.concurrent.TimeUnit
    import org.apache.flink.streaming.api.datastream.{AsyncDataStream, DataStream, SingleOutputStreamOperator}
    case class class_infor(name: String, age: Int)
    object 异步访问外部系统 {
      def main(args: Array[String]): Unit = {
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
        val streamLocal = StreamExecutionEnvironment.createLocalEnvironment(2)
    
        val data: DataStream[class_infor] = streamLocal.fromElements(class_infor("a", 12), class_infor("b", 2))
        val resultStream: SingleOutputStreamOperator[(Int, String)] = AsyncDataStream.orderedWait(data,
          new AsyncFunction_ceshi(),
          5, TimeUnit.SECONDS,
          30) //最多30个并发请求
        resultStream.print()
        streamLocal.execute("ceshi")
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
  • 相关阅读:
    痞子衡嵌入式:揭秘i.MXRT1060,1010上串行NOR Flash冗余程序启动设计
    Redis全文搜索教程之创建索引并关联源数据
    HTML5-常用标签(二)
    iOS面试准备 - 其他篇
    WordPress CVE-2022-4230复现分析
    Maestro实践
    算法学习的积累
    【PostgreSQL】PostgreSQL 15移除了Stats Collector
    Rational rose 安装教程
    k8s如何强制删除pod&pv&pvc和ns&namespace方法
  • 原文地址:https://blog.csdn.net/qq_33982605/article/details/126944177