• SparkStreaming的foreachPartition理解


    首先我们要知道:

    foreachRDD 作用于 DStream中每一个时间间隔的 RDD

    foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition

    foreach 作用于每一个时间间隔的 RDD 中的每一个元素。

    foreachRDD是作用在driver端的一种最常见的输出方式,而其他都作用在executor端。

    所以注意!在driver端建立类似连接的话(或者想作为共享变量的对象)

    rdd是获取不到的!这就是为什么我在向hbase写DStream的时候迟迟写不进去的原因!

    举例:

    1. //错误写法
    2. dstream.foreachRDD { rdd =>
    3. val connection = createNewConnection() // executed at the driver
    4. rdd.foreach { record =>
    5. connection.send(record) // executed at the worker
    6. }
    7. }

    当然 我们接下来肯定是想着把连接写到foreach里面解决问题的,但是这样容易炸!

    因为每次遍历RDD的时候都会产生一个连接 创建连接和关闭连接都很频繁 造成系统不必要的开销

    于是我们就用foreachPartition解决问题!

    1. // 使用foreachPartitoin来减少连接的创建,RDD的每个partition创建一个链接
    2. dstream.foreachRDD { rdd =>
    3. rdd.foreachPartition { partitionOfRecords =>
    4. val connection = createNewConnection()
    5. partitionOfRecords.foreach(record => connection.send(record))
    6. connection.close()
    7. }
    8. }

    还有优化手段 因为分区过多的话连接数也会变多 于是还可以用线程池

    1. // 使用静态连接池,可以增加连接的复用、减少连接的创建和关闭。
    2. dstream.foreachRDD { rdd =>
    3. rdd.foreachPartition { partitionOfRecords =>
    4. // ConnectionPool is a static, lazily initialized pool of connections
    5. val connection = ConnectionPool.getConnection()
    6. partitionOfRecords.foreach(record => connection.send(record))
    7. ConnectionPool.returnConnection(connection) // return to the pool for future reuse
    8. }
    9. }

  • 相关阅读:
    ALP300智能型低压马达保护器
    1011 World Cup Betting
    前端面试宝典React篇17 如何写一份大厂 HR 满意的简历?
    太强啦!!!ChatGPT 能上传文件了,能执行 Python 代码啦!
    Vue源码之数据响应式
    Redis布隆过滤器
    网络基础1:网络初始与网络套接字通信
    一款GoFrame+Vue+ElementUI后台管理框架
    【QT-lineEidte动画效果
    多线程-线程与进程、线程的实现方式(第十八天)
  • 原文地址:https://blog.csdn.net/weixin_51981189/article/details/128136812