• MySQL向Es数据同步策略


    哈喽,大家好!目前有有一个小项目功能落到自己手中,也是一个面试必考点。如何保证MySQL与Es、Redis等之间的数据一致性,带着大家的问题,我给提供一种解决方案(最终一致性)

    代码如下:

    1. @Component
    2. @Slf4j
    3. @EnableAsync
    4. @AllArgsConstructor
    5. public class EsSynchronizeTasksJob {
    6. private final ISysUserAllESService sysUserAllESService;
    7. private final SysUserAllESDocMapper userAllESDocMapper;
    8. private final SysPositRequestDocMapper sysPositRequestDocMapper;
    9. private final ISysPositRequestService sysPositRequestService;
    10. // @Scheduled(fixedRate = 10000)
    11. // @Scheduled(cron = "0 0 3 * * ?")
    12. @Async("pushMysqlToEsExecutor")
    13. @Transactional(rollbackFor = Exception.class)
    14. public void syncDataToEsBySysUserAllES() throws Exception {
    15. log.info("<----异步执行数据同步开始---->");
    16. // 获取系统时间
    17. Long startTime = System.currentTimeMillis();
    18. process(sysPositRequestService, sysPositRequestDocMapper);
    19. process(sysUserAllESService, userAllESDocMapper);
    20. Long endTime = System.currentTimeMillis();
    21. log.info("<----异步执行数据同步结束---->");
    22. log.info("<----异步执行数据同步耗时---->" + (endTime - startTime) + "ms");
    23. }
    24. private extends BaseEntity, S extends IService, M extends BaseEsMapper> void process(IService service, BaseEsMapper mapper) throws Exception {
    25. Class docClass = mapper.getEntityClass();
    26. String indexName = docClass.getAnnotation(Document.class).indexName();
    27. if (!mapper.existsIndex(indexName)) {
    28. boolean success = mapper.createIndex(indexName);
    29. if (success) {
    30. System.out.println("索引创建成功!indexName===>" + indexName);
    31. }
    32. } else {
    33. List sysUserAllESDocs = mapper.selectList(new LambdaEsQueryWrapper<>());
    34. System.out.println("es中有" + sysUserAllESDocs.size());
    35. // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
    36. Integer delete = mapper.delete(new LambdaEsQueryWrapper<>());
    37. System.out.println("删除ES之前的数据" + delete);
    38. }
    39. // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
    40. List list = service.lambdaQuery().getBaseMapper().selectList(new LambdaQueryWrapper<>());
    41. List document = new ArrayList<>();
    42. for (D o : list) {
    43. D docInstance = docClass.getDeclaredConstructor().newInstance();
    44. BeanUtils.copyProperties(o, docInstance);
    45. Method setIdMethod = docClass.getMethod("setMysqlId", Long.class);
    46. Long idValue = (Long) o.getClass().getMethod("getId").invoke(o);
    47. setIdMethod.invoke(docInstance, idValue);
    48. document.add(docInstance);
    49. }
    50. int successCount = mapper.insertBatch(document);
    51. System.out.println("在MySQL表中有:" + list.size() + "条数据");
    52. System.out.println("向ES中成功添加:" + successCount + "条数据");
    53. if (list.size() == successCount) {
    54. System.out.println("同步成功");
    55. } else {
    56. int i = list.size() - successCount;
    57. System.out.println("同步失败,有" + i + "条数据未写入ES,请联系管理员!");
    58. }
    59. }
    60. }

    自定义线程池类:

    1. package com.ruoyi.web.es.conifg;
    2. import com.google.common.util.concurrent.ThreadFactoryBuilder;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    6. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
    7. import java.util.concurrent.LinkedBlockingQueue;
    8. import java.util.concurrent.ThreadPoolExecutor;
    9. import java.util.concurrent.TimeUnit;
    10. /**
    11. * 七大参数:
    12. * 1 corePoolSize:线程池核心线程数量,核心线程不会被回收,即使没有任务执行,也会保持空闲状态。如果线程池中的线程少于此数目,则在执行任务时创建。
    13. * 2 maximumPoolSize:池允许最大的线程数,当线程数量达到corePoolSize,且workQueue队列塞满任务了之后,继续创建线程。
    14. * 3 keepAliveTime:超过corePoolSize之后的“临时线程”的存活时间。
    15. * 4 unit:keepAliveTime的单位。
    16. * 5 workQueue:当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。
    17. * 6 threadFactory:创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。
    18. * 7 handler:线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。
    19. * 四大拒绝策略:
    20. * 1 AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
    21. * 2 DiscardPolicy:直接抛弃不处理。
    22. * 3 DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
    23. * 4 CallerRunsPolicy:使用当前调用的线程来执行此任务
    24. *
    25. * @author gao
    26. */
    27. @Configuration
    28. public class ThreadPoolTaskExecutorConfig {
    29. // 核心线程数
    30. private static final int corePoolSize = 4;
    31. // 最大线程数
    32. private static final int maxPoolSize = 4;
    33. // 允许线程空闲时间(单位:默认为秒)
    34. private static final int keepAliveTime = 10;
    35. // 缓冲队列数
    36. private static final int queueCapacity = 15;
    37. // 线程池名前缀
    38. private static final String threadNamePrefix = "Thread-PushMessage-Service-";
    39. @Bean("pushMysqlToEsExecutor")
    40. public ThreadPoolTaskExecutor pushMessageExecutor() {
    41. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    42. executor.setCorePoolSize(corePoolSize);
    43. executor.setMaxPoolSize(maxPoolSize);
    44. executor.setQueueCapacity(queueCapacity);
    45. executor.setKeepAliveSeconds(keepAliveTime);
    46. executor.setThreadNamePrefix(threadNamePrefix);
    47. // setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
    48. executor.setWaitForTasksToCompleteOnShutdown(true);
    49. // setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
    50. executor.setAwaitTerminationSeconds(60);
    51. // 线程池对拒绝任务的处理策略
    52. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    53. // 初始化
    54. executor.initialize();
    55. return executor;
    56. }
    57. @Bean(name = "pushMysqlToEsScheduler")
    58. public ThreadPoolTaskScheduler taskScheduler() {
    59. ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    60. ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
    61. corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
    62. new LinkedBlockingQueue<>(queueCapacity),
    63. new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build()
    64. );
    65. scheduler.setPoolSize(customThreadPool.getMaximumPoolSize());
    66. scheduler.setThreadNamePrefix(threadNamePrefix);
    67. return scheduler;
    68. }
    69. }

    大家知道有几种保证数据一致性的解决方案呢?

    ①通过读写锁来实现

    ②通过中间件canal伪装成MySQL的从节点来实现(参考地址:超详细的Canal入门,看这篇就够了!-CSDN博客

    ③通过MQ异步消息推送实现,MySQL数据发生变化时向MQ发送消息

    ④通过定时任务获取MySQL中更新的数据

    ..........

    最后:①②是强一致性的解决方案,③④是最终一致性的解决方案。项目实战是一个漫长的过程,山高路远,道阻且长!加油💪🏻!

  • 相关阅读:
    大佬公司的github地址
    IE惯导数据紧组合处理过程与方法
    用向量数据库Milvus Cloud 搭建AI聊天机器人
    jmeter之跨线程关联
    Linux删除空目录/非空目录和文件
    基于EasyExcel实现的分页数据下载封装
    部署你自己的导航站-dashy
    如何查看自己的GPU型号以及配置信息
    模拟电路总结
    卡码网语言基础课 |开房门
  • 原文地址:https://blog.csdn.net/m0_68921558/article/details/139628241