• (十七) 共享模型之工具【JUC】【读写锁】


    一、ReentrantReadWriteLock(P247)

    当读操作远远高于写操作时,这时候使用 读写锁 - 可以并发,提高性能。 类似于数据库中的 select ... from ... lock in share mode
    提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
    1. @Slf4j(topic = "c.TestReadWriteLock")
    2. public class TestReadWriteLock {
    3. public static void main(String[] args) throws InterruptedException {
    4. DataContainer dataContainer = new DataContainer();
    5. new Thread(() -> {
    6. dataContainer.read();
    7. }, "t1").start();
    8. new Thread(() -> {
    9. dataContainer.read();
    10. }, "t2").start();
    11. }
    12. }
    13. @Slf4j(topic = "c.DataContainer")
    14. class DataContainer {
    15. private Object data;
    16. private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    17. private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    18. private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
    19. public Object read() {
    20. log.debug("获取读锁...");
    21. r.lock();
    22. try {
    23. log.debug("读取");
    24. sleep(1);
    25. return data;
    26. } finally {
    27. log.debug("释放读锁...");
    28. r.unlock();
    29. }
    30. }
    31. public void write() {
    32. log.debug("获取写锁...");
    33. w.lock();
    34. try {
    35. log.debug("写入");
    36. sleep(1);
    37. } finally {
    38. log.debug("释放写锁...");
    39. w.unlock();
    40. }
    41. }
    42. }

    注意事项
    (1)读锁不支持条件变量。
    (2)重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

    (3) 重入时降级支持:即持有写锁的情况下去获取读锁。

    二、 * 应用之缓存

    1. 缓存更新策略

    更新时,是先清缓存还是先更新数据库

    2. 读写锁实现一致性缓存

    1. public class TestGenericDao {
    2. public static void main(String[] args) {
    3. GenericDao dao = new GenericDaoCached();
    4. System.out.println("============> 查询");
    5. String sql = "select * from emp where empno = ?";
    6. int empno = 7369;
    7. Emp emp = dao.queryOne(Emp.class, sql, empno);
    8. System.out.println(emp);
    9. emp = dao.queryOne(Emp.class, sql, empno);
    10. System.out.println(emp);
    11. emp = dao.queryOne(Emp.class, sql, empno);
    12. System.out.println(emp);
    13. System.out.println("============> 更新");
    14. dao.update("update emp set sal = ? where empno = ?", 800, empno);
    15. emp = dao.queryOne(Emp.class, sql, empno);
    16. System.out.println(emp);
    17. }
    18. }
    19. class GenericDaoCached extends GenericDao {
    20. private GenericDao dao = new GenericDao();
    21. private Map map = new HashMap<>();
    22. private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    23. @Override
    24. public List queryList(Class beanClass, String sql, Object... args) {
    25. return dao.queryList(beanClass, sql, args);
    26. }
    27. @Override
    28. public T queryOne(Class beanClass, String sql, Object... args) {
    29. // 先从缓存中找,找到直接返回
    30. SqlPair key = new SqlPair(sql, args);;
    31. rw.readLock().lock();
    32. try {
    33. T value = (T) map.get(key);
    34. if(value != null) {
    35. return value;
    36. }
    37. } finally {
    38. rw.readLock().unlock();
    39. }
    40. rw.writeLock().lock();
    41. try {
    42. // 多个线程
    43. T value = (T) map.get(key);
    44. if(value == null) {
    45. // 缓存中没有,查询数据库
    46. value = dao.queryOne(beanClass, sql, args);
    47. map.put(key, value);
    48. }
    49. return value;
    50. } finally {
    51. rw.writeLock().unlock();
    52. }
    53. }
    54. @Override
    55. public int update(String sql, Object... args) {
    56. rw.writeLock().lock();
    57. try {
    58. // 先更新库
    59. int update = dao.update(sql, args);
    60. // 清空缓存
    61. map.clear();
    62. return update;
    63. } finally {
    64. rw.writeLock().unlock();
    65. }
    66. }
    67. class SqlPair {
    68. private String sql;
    69. private Object[] args;
    70. public SqlPair(String sql, Object[] args) {
    71. this.sql = sql;
    72. this.args = args;
    73. }
    74. @Override
    75. public boolean equals(Object o) {
    76. if (this == o) {
    77. return true;
    78. }
    79. if (o == null || getClass() != o.getClass()) {
    80. return false;
    81. }
    82. SqlPair sqlPair = (SqlPair) o;
    83. return Objects.equals(sql, sqlPair.sql) &&
    84. Arrays.equals(args, sqlPair.args);
    85. }
    86. @Override
    87. public int hashCode() {
    88. int result = Objects.hash(sql);
    89. result = 31 * result + Arrays.hashCode(args);
    90. return result;
    91. }
    92. }
    93. }

    注意
    以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
    1️⃣适合读多写少,如果写操作比较频繁,以上实现性能低
    2️⃣没有考虑缓存容量
    3️⃣没有考虑缓存过期
    4️⃣只适合单机
    5️⃣并发性还是低,目前只会用一把锁
    6️⃣更新方法太过简单粗暴,清空了所有 key (考虑按类型分区或重新设计 key

    三、* 读写锁原理

    1. 图解流程

    读写锁用的是同一个 Sycn 同步器,因此等待队列、 state 等也是同一个

    1.1 t1 w.lockt2 r.lock

    (1)t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

    (2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败

    tryAcquireShared 返回值表示

    1️⃣-1 表示失败

    2️⃣0 表示成功,但后继节点不会继续唤醒

    3️⃣正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1

    (3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

    (4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁

    (5)如果没有成功,在 doAcquireShared for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() park

    1.2 t3 r.lockt4 w.lock

    这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

    1.3 t1 w.unlock

    2. 源码分析

    四、StampedLock

    该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

    加解读锁

    1. long stamp = lock.readLock();
    2. lock.unlockRead(stamp);

    加解写锁
    1. long stamp = lock.writeLock();
    2. lock.unlockWrite(stamp);
    乐观读, StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
    1. long stamp = lock.tryOptimisticRead();
    2. // 验戳
    3. if(!lock.validate(stamp)){
    4. // 锁升级
    5. }
    提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
    1. @Slf4j(topic = "c.TestStampedLock")
    2. public class TestStampedLock {
    3. public static void main(String[] args) {
    4. DataContainerStamped dataContainer = new DataContainerStamped(1);
    5. new Thread(() -> {
    6. dataContainer.read(1);
    7. }, "t1").start();
    8. sleep(0.5);
    9. new Thread(() -> {
    10. dataContainer.read(0);
    11. }, "t2").start();
    12. }
    13. }
    14. @Slf4j(topic = "c.DataContainerStamped")
    15. class DataContainerStamped {
    16. private int data;
    17. private final StampedLock lock = new StampedLock();
    18. public DataContainerStamped(int data) {
    19. this.data = data;
    20. }
    21. public int read(int readTime) {
    22. long stamp = lock.tryOptimisticRead();
    23. log.debug("optimistic read locking...{}", stamp);
    24. sleep(readTime);
    25. if (lock.validate(stamp)) {
    26. log.debug("read finish...{}, data:{}", stamp, data);
    27. return data;
    28. }
    29. // 锁升级 - 读锁
    30. log.debug("updating to read lock... {}", stamp);
    31. try {
    32. stamp = lock.readLock();
    33. log.debug("read lock {}", stamp);
    34. sleep(readTime);
    35. log.debug("read finish...{}, data:{}", stamp, data);
    36. return data;
    37. } finally {
    38. log.debug("read unlock {}", stamp);
    39. lock.unlockRead(stamp);
    40. }
    41. }
    42. public void write(int newData) {
    43. long stamp = lock.writeLock();
    44. log.debug("write lock {}", stamp);
    45. try {
    46. sleep(2);
    47. this.data = newData;
    48. } finally {
    49. log.debug("write unlock {}", stamp);
    50. lock.unlockWrite(stamp);
    51. }
    52. }
    53. }
    注意
    StampedLock 不支持条件变量
    StampedLock 不支持可重入
  • 相关阅读:
    Naopore基因组数据组装软件---NECAT下载试用
    【Node.JS 】创建基本的web服务器
    可视化工具Netron介绍
    k8s部署实例
    Go:模幂算法(附完整源码)
    linux驱动开发:PWM驱动编写
    BOA服务器移植
    5.3 递归(用带空间图分析,思想很好)
    Go学习第十二章——Go反射与TCP编程
    LeetCode 784.字母大小写全排列
  • 原文地址:https://blog.csdn.net/yirenyuan/article/details/128202371