• 基于Canal同步MySQL数据到Elasticsearch


    基于Canal同步MySQL数据到Elasticsearch

    基于 canal 同步 mysql 的数据到 elasticsearch 中。

    1、canal-server

    相关软件的安装请参考:《Canal实现数据同步》

    1.1 pom依赖

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>com.examplegroupId>
        <artifactId>canal-to-elasticsearchartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <packaging>jarpackaging>
    
        <name>canal-to-elasticsearchname>
        <description>canal to elasticsearchdescription>
    
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.0.0.RELEASEversion>
            <relativePath/>
        parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
            <java.version>1.8java.version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>com.alibaba.ottergroupId>
                <artifactId>canal.clientartifactId>
                <version>1.1.4version>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    1.2 SimpleCanalClientExample编写

    package com.example.canatest.config;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    /**
     * 说明:用于测试canal是否已经连接上了mysql
     */
    public class SimpleCanalClientExample {
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",
                    11111), "example", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                CanalEntry.RowChange rowChage = null;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                CanalEntry.EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    在这里插入图片描述

    在这里插入图片描述

    注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

    这个类只是测试,下面不使用。

    2、canal-adapter

    由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的。

    canal-adapter安装:

    搜索镜像

    $ docker search canal-adapter
    
    • 1

    在这里插入图片描述

    拉取镜像

    $ docker pull slpcat/canal-adapter:v1.1.5
    
    • 1

    在这里插入图片描述

    启动

    $ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5
    
    • 1

    在这里插入图片描述

    修改配置

    $ docker exec -it 89ef714d3a0e /bin/bash
    $ cd conf/
    $ vi application.yml
    
    • 1
    • 2
    • 3
    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    canal.conf:
      mode: tcp #tcp kafka rocketMQ rabbitMQ
      flatMessage: true
      zookeeperHosts:
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      consumerProperties:
        # canal tcp consumer
        # canal.tcp.server.host需要修改
        canal.tcp.server.host: 192.168.94.186:11111
        canal.tcp.zookeeper.hosts:
        canal.tcp.batch.size: 500
        canal.tcp.username:
        canal.tcp.password:
      srcDataSources:
        defaultDS:
          # url,username,password需要修改
          url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=true
          username: canal
          password: canal
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          # name需要修改
          - name: es7
            # hosts需要修改
            hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest mode
            properties:
              mode: rest
              # security.auth: test:123456 #  only used for rest mode
              # cluster.name需要修改
              cluster.name: my-es
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    $ cd conf/es7
    $ cp -v mytest_user.yml canal_test_collect.yml
    # 删除其他多余的
    $ rm -rf biz_order.yml customer.yml mytest_user.yml
    $ vi dailyhub_collect.yml
    
    • 1
    • 2
    • 3
    • 4
    • 5
    dataSourceKey: defaultDS
    # 需要修改
    destination: example
    # 需要修改
    groupId: g1
    esMapping:
      # 需要修改
      _index: canal_test
      _id: _id
      _type: _doc
      upsert: true
    #  pk: id
      # 需要修改
      sql: "
    SELECT
            c.id AS _id,
            c.user_id AS userId,
            c.title AS title,
            c.url AS url,
            c.note AS note,
            c.collected AS collected,
            c.created AS created,
            c.personal AS personal,
            u.username AS username,
            u.avatar AS userAvatar
    FROM
            m_collect c
    LEFT JOIN m_user u ON c.user_id = u.id
    
    "
    #  objFields:
    #    _labels: array:;
    #   etlCondition: "where c.c_time>={}"
      commitBatch: 3000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    也可以在外面编辑好,通过docker命令传输到docker容器中:

    $ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
    $ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml
    
    • 1
    • 2

    重启容器

    $ docker restart 89ef714d3a0e
    
    • 1

    验证是否启动成功

    $ docker logs -f 89ef714d3a0e
    
    • 1

    在这里插入图片描述

    注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动

    设置格式。

    3、测试

    准备测试条件:

    1、首先在数据库中生成表和字段

    CREATE TABLE `m_user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `avatar` varchar(255) DEFAULT NULL,
      `created` date DEFAULT NULL,
      `lasted` date DEFAULT NULL,
      `open_id` varchar(255) DEFAULT NULL,
      `statu` int(11) DEFAULT NULL,
      `username` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
    
    CREATE TABLE `m_collect` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `collected` date DEFAULT NULL,
      `created` date DEFAULT NULL,
      `note` varchar(255) DEFAULT NULL,
      `personal` int(11) DEFAULT NULL,
      `title` varchar(255) DEFAULT NULL,
      `url` varchar(255) DEFAULT NULL,
      `user_id` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`),
      KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
      CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    2、然后在elasticsearch中生成索引

    # 创建索引并添加映射字段
    PUT /canal_test
    {
      "mappings": {
        "properties": {
          "collected": {
            "type": "date",
            "format": "date_optional_time||epoch_millis"
          },
          "created": {
            "type": "date",
            "format": "date_optional_time||epoch_millis"
          },
          "note": {
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_smart"
          },
          "personal": {
            "type": "integer"
          },
          "title": {
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_smart"
          },
          "url": {
            "type": "text"
          },
          "userAvatar": {
            "type": "text"
          },
          "userId": {
            "type": "long"
          },
          "username": {
            "type": "keyword"
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    在这里插入图片描述

    3、插入数据

    INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');
    
    • 1

    在这里插入图片描述

    4、查看数据

    GET /canal_test/_search
    
    • 1

    5、遇到的问题

    如果看到canal-adapter一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysqlescanal

    adapar

    2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!
    
    • 1
  • 相关阅读:
    MyBatis-Plus分页插件和使用Mapper文件
    TwineCompile高级编译系统
    TsMaster报文发送
    解决微信小程序recycle-view使用百分比单位控制宽高时出现的内容溢出问题
    【目录】前端开发(JavaScript、Vue)
    牛客网刷题记录 || C++入门
    乐趣国学—品读“富润屋,德润身。”中的智慧
    ARM IIC总线实现温湿传感器
    2023-09-30:用go语言,给你一个整数数组 nums 和一个整数 k 。 nums 仅包含 0 和 1, 每一次移动,你可以选择 相邻 两个数字并将它们交换。 请你返回使 nums 中包含 k
    并查集-合并集合
  • 原文地址:https://blog.csdn.net/qq_30614345/article/details/134083545