• 多线程编程模式-Producer-consumer


    5.生产者消费者Producer-consumer

    避免生产和消费速率的差异,引入一个channel,对二者解耦

    实现producer的时候用到了两阶段模式的AbstractTerminatedThread:

    Channel.java
    1. package comsumerproducer;
    2. /**
    3. * Channel
    4. */
    5. public interface Channel

      {

    6. /**
    7. * 取出一个产品
    8. * @return
    9. * @throws InterruptedException
    10. */
    11. P take() throws InterruptedException;
    12. /**
    13. * 生产一个产品放入通道
    14. * @param product
    15. * @throws InterruptedException
    16. */
    17. void put(P product) throws InterruptedException;
    18. }
    BlockingQueueChannel implements Channel
    1. package comsumerproducer;
    2. import java.util.concurrent.BlockingQueue;
    3. /**
    4. * 基于阻塞队列的通道实现
    5. */
    6. public class BlockingQueueChannel

      implements Channel

      {

    7. private final BlockingQueue

      queue;

    8. public BlockingQueueChannel(BlockingQueue

      queue) {

    9. this.queue = queue;
    10. }
    11. @Override
    12. public P take() throws InterruptedException {
    13. return queue.take();
    14. }
    15. @Override
    16. public void put(P product) throws InterruptedException {
    17. queue.put(product);
    18. }
    19. }
    AttachmentProcessor 角色Producer
    
    1. package comsumerproducer;
    2. import twophased.AbstractTerminatedThread;
    3. import java.io.*;
    4. import java.text.Normalizer;
    5. import java.util.Random;
    6. import java.util.concurrent.ArrayBlockingQueue;
    7. /**
    8. * 模式角色:producer
    9. */
    10. public class AttachmentProcessor {
    11. private final String ATTACHMENT_STORE_BASE_DIR =
    12. "/home/viscent/tmp/attachments/";
    13. /**
    14. * 模式角色:Channel
    15. */
    16. private final Channel channel =
    17. new BlockingQueueChannel(new ArrayBlockingQueue(200));
    18. /**
    19. * 模式角色:Consumer
    20. */
    21. private final AbstractTerminatedThread indexingThread = new AbstractTerminatedThread() {
    22. @Override
    23. protected void doRun() throws Exception {
    24. File file = null;
    25. file = channel.take();
    26. try {
    27. indexFile(file);
    28. } catch (Exception e) {
    29. e.printStackTrace();
    30. } finally {
    31. terminationToken.reservations.decrementAndGet();
    32. }
    33. }
    34. private void indexFile(File file) throws Exception {
    35. /**
    36. * 省略其他代码
    37. */
    38. /**
    39. * 模拟产生索引文件的时间消耗
    40. */
    41. Random random = new Random();
    42. try {
    43. Thread.sleep(random.nextInt(100));
    44. } catch (InterruptedException e) {
    45. ;
    46. }
    47. }
    48. };
    49. public void init() {
    50. indexingThread.start();
    51. }
    52. public void shutdown() {
    53. indexingThread.terminate();
    54. }
    55. public void saveAttachment(InputStream in, String documentId,
    56. String originalFileName) throws Exception {
    57. File file = saveAsFile(in, documentId, originalFileName);
    58. try {
    59. channel.put(file);
    60. } catch (InterruptedException e) {
    61. ;
    62. }
    63. indexingThread.terminationToken.reservations.incrementAndGet();
    64. }
    65. private File saveAsFile(InputStream in, String documentId,
    66. String originalFileName) throws IOException {
    67. String dirName = ATTACHMENT_STORE_BASE_DIR + documentId;
    68. File dir = new File(dirName);
    69. dir.mkdirs();
    70. File file = new File(dirName + "/"
    71. + Normalizer.normalize(originalFileName, Normalizer.Form.NFC));
    72. // 防止目录跨越攻击
    73. if (!dirName.equals(file.getCanonicalFile().getParent())) {
    74. throw new SecurityException("Invalid originalFileName:" + originalFileName);
    75. }
    76. BufferedOutputStream bos = null;
    77. BufferedInputStream bis = new BufferedInputStream(in);
    78. byte[] buf = new byte[2048];
    79. int len = -1;
    80. try {
    81. bos = new BufferedOutputStream(new FileOutputStream(file));
    82. while ((len = bis.read(buf)) > 0) {
    83. bos.write(buf, 0, len);
    84. }
    85. bos.flush();
    86. } finally {
    87. try {
    88. bis.close();
    89. } catch (IOException e) {
    90. ;
    91. }
    92. try {
    93. if (null != bos) {
    94. bos.close();
    95. }
    96. } catch (IOException e) {
    97. ;
    98. }
    99. }
    100. return file;
    101. }
    102. }

     

    通道积压

    消费者处理过慢时会出现通道积压,需要进行处理,分以下两种:

    1.使用有界阻塞队列

    ArrayBlockingQueue和有容量限制的LinkedBlockingQueue

    2.使用带流量控制的无界阻塞队列:

    不带容量控制的LinkedBlockingQueue。借助流量控制实现,对同一时间内可有多少个生产者线程往通道中存储产品进行限制。本例使用基于Semaphore的支持流量控制的实现

    1. package comsumerproducer;
    2. import java.util.concurrent.BlockingQueue;
    3. import java.util.concurrent.Semaphore;
    4. /**
    5. * 基于Semaphore的支持流量控制的通道实现
    6. * @param

    7. */
    8. public class SemaphoreBasedChannel

      implements Channel

      {

    9. private final BlockingQueue

      queue;

    10. private final Semaphore semaphore;
    11. public SemaphoreBasedChannel(BlockingQueue

      queue, int flowLimit) {

    12. this.queue = queue;
    13. this.semaphore = new Semaphore(flowLimit);
    14. }
    15. @Override
    16. public P take() throws InterruptedException {
    17. return queue.take();
    18. }
    19. @Override
    20. public void put(P product) throws InterruptedException {
    21. semaphore.acquire();
    22. try {
    23. queue.put(product);
    24. } finally {
    25. semaphore.release();
    26. }
    27. }
    28. }

    优化的工作窃取算法思想

    producer-consumer中通常channel用queue实现,一个通道可对应一个或多个队列实例。现在本例仅使用一个ArrayBlockingQueue,如果有多个消费者线程从这个queue中获取产品,共享同一个实例。导致锁的竞争。

    如果一个通道实例对应多个队列实例,就可以实现多个消费者线程从通道中取产品时候访问各自的队列实例。

    如果一个消费者从自己的队列中取完任务,可以继续从其他消费者的队列中取出产品进行处理。

    1. package comsumerproducer;
    2. import java.util.concurrent.BlockingDeque;
    3. public interface WorkStealingEnableChannel

      extends Channel

      {

    4. P take(BlockingDeque

      preferredQueue) throws InterruptedException;

    5. }
    1. package comsumerproducer;
    2. import java.util.concurrent.BlockingDeque;
    3. public class WorkStealingChannel implements WorkStealingEnableChannel {
    4. /**
    5. * 受管队列
    6. * @param preferredQueue
    7. * @return
    8. * @throws InterruptedException
    9. */
    10. private final BlockingDeque[] managedQueue;
    11. public WorkStealingChannel(BlockingDeque[] managedQueue) {
    12. this.managedQueue = managedQueue;
    13. }
    14. @Override
    15. public T take(BlockingDeque preferredQueue) throws InterruptedException {
    16. /**
    17. * 优先从指定的受管队列中取产品
    18. */
    19. BlockingDeque targetQueue = preferredQueue;
    20. T product = null;
    21. /**
    22. * 试图从指定的队列队首取"产品"
    23. */
    24. if (null != targetQueue) {
    25. product = targetQueue.poll();
    26. }
    27. int queueIndex = -1;
    28. while (null == product) {
    29. queueIndex = (queueIndex + 1) % managedQueue.length;
    30. targetQueue = managedQueue[queueIndex];
    31. /**
    32. * 试图从其他受管队列的队尾取
    33. */
    34. product = targetQueue.pollLast();
    35. if (preferredQueue == targetQueue) {
    36. break;
    37. }
    38. }
    39. if (null == product) {
    40. /**
    41. * 随机窃取其他受管队列的产品
    42. */
    43. queueIndex = (int) (System.currentTimeMillis() % managedQueue.length);
    44. targetQueue = managedQueue[queueIndex];
    45. product = targetQueue.pollLast();
    46. System.out.println("stealed from " + queueIndex + ":" + product);
    47. }
    48. return product;
    49. }
    50. @Override
    51. public T take() throws InterruptedException {
    52. return take(null);
    53. }
    54. @Override
    55. public void put(T product) throws InterruptedException {
    56. int targetIndex = (product.hashCode() % managedQueue.length);
    57. BlockingDeque targetQueue = managedQueue[targetIndex];
    58. targetQueue.put(product);
    59. }
    60. }
    1. package comsumerproducer;
    2. import twophased.AbstractTerminatedThread;
    3. import twophased.TerminationToken;
    4. import java.util.Random;
    5. import java.util.concurrent.BlockingDeque;
    6. import java.util.concurrent.LinkedBlockingDeque;
    7. /**
    8. * 工作窃取算法
    9. */
    10. public class WorkStealingExample {
    11. private final WorkStealingEnableChannel channel;
    12. private final TerminationToken token = new TerminationToken();
    13. public WorkStealingExample() {
    14. int nCPU = Runtime.getRuntime().availableProcessors();
    15. int consumerCount = nCPU / 2 + 1;
    16. BlockingDeque[] managedQueue = new LinkedBlockingDeque[consumerCount];
    17. /**
    18. * 该通道实例对应了多个queue
    19. */
    20. channel = new WorkStealingChannel(managedQueue);
    21. Consumer[] consumers = new Consumer[consumerCount];
    22. for (int i = 0; i < consumerCount; i++) {
    23. managedQueue[i] = new LinkedBlockingDeque();
    24. consumers[i] = new Consumer(token, managedQueue[i]);
    25. }
    26. for (int i = 0; i < nCPU; i++) {
    27. new Producer().start();
    28. }
    29. for (int i = 0; i < consumerCount; i++) {
    30. consumers[i].start();
    31. }
    32. }
    33. public void doSomething() {
    34. }
    35. public static void main(String[] args) throws InterruptedException {
    36. WorkStealingExample wse = new WorkStealingExample();
    37. wse.doSomething();
    38. Thread.sleep(3500);
    39. }
    40. private class Producer extends AbstractTerminatedThread {
    41. private int i = 0;
    42. @Override
    43. protected void doRun() throws Exception {
    44. channel.put(String.valueOf(i++));
    45. token.reservations.incrementAndGet();
    46. }
    47. }
    48. private class Consumer extends AbstractTerminatedThread {
    49. private final BlockingDeque workQueue;
    50. public Consumer(TerminationToken token, BlockingDeque workQueue) {
    51. super(token);
    52. this.workQueue = workQueue;
    53. }
    54. @Override
    55. protected void doRun() throws Exception {
    56. /**
    57. * WorkStealingEnableChannel接口的take方法实现了工作窃取算法
    58. */
    59. String produce = channel.take(workQueue);
    60. System.out.println("processing product:" + produce);
    61. /**
    62. * 模拟执行真正操作的时间消耗
    63. */
    64. try {
    65. Thread.sleep(new Random().nextInt(50));
    66. } catch (InterruptedException e) {
    67. ;
    68. } finally {
    69. token.reservations.decrementAndGet();
    70. }
    71. }
    72. }
    73. }

  • 相关阅读:
    PHP环境安装
    类加载机制和类加载器
    Json、JDBC
    docker mysql 启动报错
    java虚拟机 JVM问题记录
    【AI赋能医学】基于深度学习和HRV特征的多类别心电图分类
    Linux下提取文件夹下的所有文件名称
    Mysql数据库基础
    CentOS 7 基于C 连接ZooKeeper 客户端
    美创科技入选IDC中国等保合规市场报告推荐厂商
  • 原文地址:https://blog.csdn.net/wanglt311/article/details/126695174