• Spark Streaming状态管理函数


    本帖转载如下地址

    Spark Streaming状态管理函数
    (一)—updateStateByKey和mapWithState
    https://blog.csdn.net/m0_37914799/article/details/84702378
    (二)—updateStateByKey的使用(scala版)
    https://blog.csdn.net/m0_37914799/article/details/84703057
    (三)—MapWithState的使用(scala版)
    https://blog.csdn.net/m0_37914799/article/details/84703854

    结论
      mapWithState它会按照时间线在每一个批次间隔返回之前的发生改变的或者新的key的状态,不发生变化的不返回;同时mapWithState可以不用设置checkpoint,返回的数据量少。而updateStateByKey统计的是全局Key的状态,就算没有数据输入也会在每个批次的时候返回之前的Key的状态,当数据量大时而且使用checkpoint会占用较大的存储。因此,mapWithState性能和效率要比updateStateByKey好。

    mapWithState 官方demo
    https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
     

    mapWithState过期时间设置
    mapWithState(...).timeout(3s))


    时间参数说明:timeout()传入一个时间间隔参数,如果一个key在大于此间隔没有此key的数据流入,则被认为是空闲的。如上设置为3s。


    但实际情况,3s过后发现key并没有过期,也不会被清除,大概30S之后被清除。
    当超时时,状态数据并不会立即删除当前key的数据,而是打上“删除标记”。

    1. override def remove(key: K): Unit = {
    2. val stateInfo = deltaMap(key)
    3. if (stateInfo != null) {
    4. stateInfo.markDeleted() //only marked for deletion
    5. } else {
    6. val newInfo = new StateInfo[S](deleted = true)
    7. deltaMap.update(key, newInfo)
    8. }
    9. }

    如上,超时的events 会收集到 deltaMap中。

    当doFullScan为true的时候,才会触发过期key的清除,updateRecordWithData()负责全面扫描清除过期key。doFullScan的默认值是false。

    1. override def checkpoint(): Unit = {
    2. super.checkpoint()
    3. doFullScan = true
    4. }
    5. ...
    6. removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
    7. ...
    8. // Get the timed out state records, call the mapping function on each and collect the
    9. // data returned
    10. if (removeTimedoutData && timeoutThresholdTime.isDefined) {
    11. ...

    默认情况下,发生10个迭代时,才会清除,因此本例中当我设置窗口为3s时,checkpoint周期就是30s,30s才会清理一次过期key。


    一段英文描述
    a key this current batch timeout then the key have to wait for "doFullScan = true" which means "batchtime*DEFAULT_CHECKPOINT_DURATION_MULTIPLIER(defalut:10)"

  • 相关阅读:
    Django笔记十一之外键查询优化select_related和prefetch_related
    手搭手入门MybaitsX
    docker network网络模式
    架构风格相关内容
    蓝桥杯-回文日期[Java]
    Istio、eBPF 和 RSocket Broker:深入研究服务网格
    21天学习第五天--数组
    Java八股文(Spring Security)
    内网穿透到公网,让你的小伙伴访问你本地的项目
    云原生|kubernetes |使用Prometheus监控k8s cAdvisor篇(一)(centos)
  • 原文地址:https://blog.csdn.net/willyan2007/article/details/126746887