• 并发编程-线程池ForkJoinPool


    ForkJoinPool

    算法题:如何充分利用多核CPU的性能,快速对一个2千万大小的数组进行排序?

    分治思想:分解 求解 合并

    分治思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。

    分治思想的步骤如下:

    1、分解:将要解决的问题划分成若干规模较小的同类问题;(子问题不能无限小,所以通常会设置阈值)

    2、求解:当子问题划分得足够小时,用较简单的方法解决;

    3、合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。

    计算机十大经典算法中的归并排序、快速排序、二分查找都是基于分治思想实现的算法

    归并排序(Merge Sort)演示:Comparison Sorting Visualization

    扩展:Arrays.sort() 非常高效的算法,快速排序,并且进行了优化

    1. /**
    2. * Sorts the specified array into ascending numerical order.
    3. *
    4. *

      Implementation note: The sorting algorithm is a Dual-Pivot Quicksort

    5. * by Vladimir Yaroslavskiy, Jon Bentley, and Joshua Bloch. This algorithm
    6. * offers O(n log(n)) performance on many data sets that cause other
    7. * quicksorts to degrade to quadratic performance, and is typically
    8. * faster than traditional (one-pivot) Quicksort implementations.
    9. *
    10. * @param a the array to be sorted
    11. */
    12. public static void sort(int[] a) {
    13. DualPivotQuicksort.sort(a, 0, a.length - 1, null, 0, 0);
    14. }
    15. /**
    16. * Sorts the specified array into ascending numerical order.
    17. *
    18. * @implNote The sorting algorithm is a parallel sort-merge that breaks the
    19. * array into sub-arrays that are themselves sorted and then merged. When
    20. * the sub-array length reaches a minimum granularity, the sub-array is
    21. * sorted using the appropriate {@link Arrays#sort(byte[]) Arrays.sort}
    22. * method. If the length of the specified array is less than the minimum
    23. * granularity, then it is sorted using the appropriate {@link
    24. * Arrays#sort(byte[]) Arrays.sort} method. The algorithm requires a
    25. * working space no greater than the size of the original array. The
    26. * {@link ForkJoinPool#commonPool() ForkJoin common pool} is used to
    27. * execute any parallel tasks.
    28. *
    29. * @param a the array to be sorted
    30. *
    31. * @since 1.8
    32. */
    33. public static void parallelSort(byte[] a) {
    34. int n = a.length, p, g;
    35. if (n <= MIN_ARRAY_SORT_GRAN ||
    36. (p = ForkJoinPool.getCommonPoolParallelism()) == 1)
    37. DualPivotQuicksort.sort(a, 0, n - 1);
    38. else
    39. new ArraysParallelSortHelpers.FJByte.Sorter
    40. (null, a, new byte[n], 0, n, 0,
    41. ((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
    42. MIN_ARRAY_SORT_GRAN : g).invoke();
    43. }

    基于归并排序算法实现

    对于大小为2千万的数组进行快速排序,可以使用高效的归并排序算法来实现。

    什么是归并排序

    归并排序(Merge Sort)是一种基于分治思想的排序算法。归并排序的基本思想是将一个大数组分成两个相等大小的子数组,对每个子数组分别进行排序,然后将两个子数组合并成一个有序的大数组。因为常常使用递归实现(由先拆分后合并的性质决定的),所以我们称其为归并排序。

    归并排序的步骤包括:

    • 将数组分成两个子数组

    • 对每个子数组进行排序

    • 合并两个有序的子数组(合并后仍是有序的,用双指针排序保证)

    归并排序的时间复杂度为O(nlogn),空间复杂度为O(n),其中n为数组的长度。

    使用归并排序实现上面的算法题

    单线程实现归并排序

    单线程归并算法的实现:将序列分成两个部分,分别进行递归排序,然后将排序好的子序列合并起来。

    1. public class MergeSort
    2. {
    3. /** 要排序的数组 */
    4. private final int[] arrayToSort;
    5. /** 拆分的阈值,低于此阈值就不再进行拆分 */
    6. private final int threshold;
    7. public MergeSort(final int[] arrayToSort, final int threshold)
    8. {
    9. this.arrayToSort = arrayToSort;
    10. this.threshold = threshold;
    11. }
    12. /**
    13. * 排序
    14. */
    15. public int[] mergeSort()
    16. {
    17. return mergeSort(arrayToSort, threshold);
    18. }
    19. public static int[] mergeSort(final int[] arrayToSort, int threshold)
    20. {
    21. // 拆分后的数组长度小于阈值,直接进行排序
    22. if (arrayToSort.length < threshold)
    23. {
    24. // 调用jdk提供的排序方法
    25. Arrays.sort(arrayToSort);
    26. return arrayToSort;
    27. }
    28. int midpoint = arrayToSort.length / 2;
    29. // 对数组进行拆分
    30. int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
    31. int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
    32. // 递归调用
    33. leftArray = mergeSort(leftArray, threshold);
    34. rightArray = mergeSort(rightArray, threshold);
    35. // 合并排序结果
    36. return merge(leftArray, rightArray);
    37. }
    38. public static int[] merge(final int[] leftArray, final int[] rightArray)
    39. {
    40. // 定义用于合并结果的数组
    41. int[] mergedArray = new int[leftArray.length + rightArray.length];
    42. int mergedArrayPos = 0;
    43. // 利用双指针进行两个数的比较
    44. int leftArrayPos = 0;
    45. int rightArrayPos = 0;
    46. while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length)
    47. {
    48. if (leftArray[leftArrayPos] <= rightArray[rightArrayPos])
    49. {
    50. mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
    51. leftArrayPos++;
    52. }
    53. else
    54. {
    55. mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
    56. rightArrayPos++;
    57. }
    58. mergedArrayPos++;
    59. }
    60. while (leftArrayPos < leftArray.length)
    61. {
    62. mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
    63. leftArrayPos++;
    64. mergedArrayPos++;
    65. }
    66. while (rightArrayPos < rightArray.length)
    67. {
    68. mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
    69. rightArrayPos++;
    70. mergedArrayPos++;
    71. }
    72. return mergedArray;
    73. }
    74. public static void main(String[] args)
    75. {
    76. // 生成测试数组 用于归并排序
    77. int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);
    78. // 获取处理器数量
    79. int processors = Runtime.getRuntime().availableProcessors();
    80. MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, processors);
    81. long startTime = System.nanoTime();
    82. // 归并排序
    83. mergeSort.mergeSort();
    84. long duration = System.nanoTime() - startTime;
    85. System.out.println("单线程归并排序时间: " + (duration / (1000f * 1000f)) + "毫秒");
    86. }
    87. }
    88. /**
    89. * 随机生成数组工具类
    90. */
    91. public class Utils
    92. {
    93. public static int[] buildRandomIntArray(final int size)
    94. {
    95. int[] arrayToCalculateSumOf = new int[size];
    96. Random generator = new Random();
    97. for (int i = 0; i < arrayToCalculateSumOf.length; i++)
    98. {
    99. arrayToCalculateSumOf[i] = generator.nextInt(100000000);
    100. }
    101. return arrayToCalculateSumOf;
    102. }
    103. }
    Fork/Join并行归并排序

    并行归并排序是一种利用多线程实现的归并排序算法。基本思路:将数据分成若干部分,然后在不同线程上对这些部分进行归并排序,最后将排好序的部分合并成有序数组。在多核CPU上,这种算法也能够有效提高排序速度。

    可以使用Java的Fork/Join框架来实现归并排序的并行化:

    1. /**
    2. * 利用fork-join实现数组排序
    3. */
    4. public class MergeSortTask extends RecursiveAction
    5. {
    6. /** 拆分的阈值,低于此阈值就不再进行拆分 */
    7. private final int threshold;
    8. /** 要排序的数组 */
    9. private int[] arrayToSort;
    10. public MergeSortTask(final int[] arrayToSort, final int threshold)
    11. {
    12. this.arrayToSort = arrayToSort;
    13. this.threshold = threshold;
    14. }
    15. @Override
    16. protected void compute()
    17. {
    18. // 拆分后的数组长度小于阈值,直接进行排序
    19. if (arrayToSort.length <= threshold)
    20. {
    21. // 调用jdk提供的排序方法
    22. Arrays.sort(arrayToSort);
    23. return;
    24. }
    25. // 对数组进行拆分
    26. int midpoint = arrayToSort.length / 2;
    27. int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
    28. int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
    29. MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);
    30. MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);
    31. // 方案一:调用任务,阻塞当前线程,直到所有子任务执行完成
    32. invokeAll(leftTask, rightTask);
    33. // 方案二:包括提交任务fork和合并结果join
    34. // leftTask.fork();
    35. // rightTask.fork();
    36. // leftTask.join();
    37. // rightTask.join();
    38. // 注意:方案一和方案二可以替换,方案一的优势在于可以充分利用CPU的多核能力
    39. // 合并排序结果
    40. arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());
    41. }
    42. public int[] getSortedArray()
    43. {
    44. return arrayToSort;
    45. }
    46. public static void main(String[] args)
    47. {
    48. // 生成测试数组 用于归并排序
    49. int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);
    50. // 生成测试数组 用于forkjoin排序
    51. int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);
    52. // 获取处理器数量
    53. int processors = Runtime.getRuntime().availableProcessors();
    54. // 利用forkjoin排序
    55. MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, processors);
    56. // 构建forkjoin线程池
    57. ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
    58. long startTime = System.nanoTime();
    59. // 执行排序任务
    60. forkJoinPool.invoke(mergeSortTask);
    61. long duration = System.nanoTime() - startTime;
    62. System.out.println("forkjoin排序时间: " + (duration / (1000f * 1000f)) + "毫秒");
    63. }
    64. }

    对比输出结果:

    1. 单线程归并排序时间: 3411.657毫秒
    2. forkjoin排序时间: 1744.5936毫秒

    总结:数组越大,利用Fork/Join框架实现的并行化归并排序比单线程归并排序的效率越高

    并行实现归并排序的优化和注意事项
    • 任务的大小:任务大小的选择会影响并行算法的效率和负载均衡。如果任务太小,会造成任务划分和合并的开销过大;如果任务太大,会导致任务无法充分利用多核CPU并行处理能力。因此,在实际应用中需要根据数据量、CPU核心数等因素选择合适的任务大小。

    • 负载均衡:并行算法需要保证负载均衡。各个线程执行的任务大小和时间应该尽可能相等,否则会导致某些线程负载过重,而其他线程负载过轻的情况。在归并排序中,可以通过递归调用实现负载均衡,但是需要注意递归的层数不能太深,否则会导致任务划分和合并的开销过大。

    • 数据分布:数据分布的均匀性也会影响并行算法的效率和负载均衡。在归并排序中,如果数据分布不均匀,会导致某些线程处理的数据量过大,而其他线程处理的数据量过小的情况。因此,在实际应用中需要考虑数据的分布情况,尽可能将数据分成大小相等的子数组。

    • 内存使用:并行算法需要考虑内存的使用情况。特别是在处理大规模数据时,内存的使用情况会对算法的执行效率产生重要影响。在归并排序中,可以通过对数据进行原地归并实现内存的节约,但是需要注意归并的实现方式,以避免数据的覆盖和不稳定排序等问题。

    • 线程切换:线程切换是并行算法的一个重要开销。需要尽量减少线程的切换次数,以提高算法的执行效率。在归并排序中,可以通过设置线程池的大小和调整任务大小等方式控制线程的数量和切换开销,以实现算法的最优性能。

    Fork/Join框架介绍

  • 相关阅读:
    vue实现div拖拽
    【ESD专题】案例 :静电放电导致产品重启或死机
    【HTML】HTML网页设计--智能养老系统前端
    传参值为空时不传字段
    hadoop2.2.0开机启动的后台服务脚本(请结合上一篇学习)
    Redis概述及常用数据类型
    Window 2016 + VMWare +Thingworx 8.5 安装总结
    Python学习基础笔记十五——命名空间和作用域
    C++ Reference: Standard C++ Library reference: C Library: cstdio: fread
    nuxt 路由 & 动态路由配置
  • 原文地址:https://blog.csdn.net/weixin_58482311/article/details/133937091