服务器:Centos7
Jdk版本:1.8
Mysql版本:5.7.44
Canal版本:1.17
Es版本:7.12.1
kibana版本:7.12.1
软件包下载地址:链接:https://pan.baidu.com/s/1jRpCJP0-hr9aIghC2ZbS4g 提取码:zzzz

| IP地址 | 安装软件 |
|---|---|
| 192.168.50.210 | Mysql,Canal |
| 192.168.50.211 | Es,Kibana |
- #设置源
- wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
- # 安装依赖
- yum install -y yum-utils device-mapper-persistent-data lvm2
- # 安装docker
- yum install -y docker-ce
- # 检查安装
- docker -v
- # 启动
- systemctl start docker
设置容器镜像加速地址 登录 阿里云容器镜像服务 进入到 镜像工具 -> 镜像加速器
- # https://xxxxxxx.mirror.aliyuncs.com 替换成你的地址 !!!!!!!!!!!!
- sudo mkdir -p /etc/docker
- sudo tee /etc/docker/daemon.json <<-'EOF'
- {
- "registry-mirrors": ["https://xxxxxxx.mirror.aliyuncs.com"]
- }
- EOF
- sudo systemctl daemon-reload
- sudo systemctl restart docker
设置开机启动
systemctl enable docker.service
- # 创建容器网络 es-net
- docker network create es-net
- # docker 安装 es
- docker run -d \
- --name es \
- -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
- -e "discovery.type=single-node" \
- -v es-data:/usr/share/elasticsearch/data \
- -v es-plugins:/usr/share/elasticsearch/plugins \
- --privileged \
- --network es-net \
- -p 9200:9200 \
- -p 9300:9300 \
- elasticsearch:7.12.1
-
- # 开通端口
- sudo iptables -A INPUT -p tcp --dport 9200 -j ACCEPT
- sudo iptables -A INPUT -p tcp --dport 9300 -j ACCEPT
- # docker 安装 kibana
- docker run -d \
- --name kibana \
- -e ELASTICSEARCH_HOSTS=http://es:9200 \
- -e "I18N_LOCALE=zh-CN" \
- --network=es-net \
- -p 5601:5601 \
- kibana:7.12.1
- # 开通端口
- sudo iptables -A INPUT -p tcp --dport 5601 -j ACCEPT
- # 创建jdk安装路径
- mkdir -p /opt/java
- #将 jdk-8u301-linux-x64.tar.gz 放置 /opt/java
- mv /youpath/jdk-8u301-linux-x64.tar.gz /opt/java
- # 解压
- cd /opt/java
- tar -zxvf jdk-8u301-linux-x64.tar.gz
- # 添加环境变量
- vi /etc/profile
- # 加入如下片段
- JAVA_HOME=/opt/java/jdk1.8.0_301
- JRE_HOME=/opt/java/jdk1.8.0_301/jre
- PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
- CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
- export JAVA_HOME JRE_HOME PATH CLASSPATH
- # 保存
- # 刷新环境变量
- source /etc/profile
- # 检查
- java -version
- # 将安装包 mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz 放入/opt下
- cd /opt
- tar zxvf mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz
- mv mysql-5.7.44-linux-glibc2.12-x86_64 mysql
- # 删除安装包
- rm mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz
- # 添加环境变量
- vi /etc/profile
- # 加入如下代码段
- export PATH=/opt/mysql/bin:$PATH
- # 刷新环境变量
- source /etc/profile
- # 创建数据目录
- mkdir -p /opt/mysql/data
-
- # 创建用户 mysql
- useradd -m mysql
- # 将/opt/mysql 权限给到mysql用户
- chown -R mysql:mysql /home/mysql/mysql-5.7.44
-
- # 切换用户
- su mysql
-
- # 初始化mysql
- mysqld --initialize --user=mysql --basedir=/opt/mysql --datadir=/opt/mysql/data
如下:记录初始密码,下边要用到

- # 编写配置文件
- vi /etc/my.cnf
- # 新增或者修改参数如下
- [mysqld]
- symbolic-links=0 # 禁用软连接
- user=mysql # 用户
- basedir=/opt/mysql
- datadir=/opt/mysql/data
- socket=/tmp/mysql.sock
- lower_case_table_names=1
- server-id=1
- port=3306
- log-bin=/opt/mysql/mysql-bin
- binlog-format=ROW
- expire-logs-days=15
- #复制启动脚本
- cp /opt/mysql/support-files/mysql.server /etc/init.d/mysqld
- #启动mysql
- /etc/init.d/mysqld start
- # 修改数据库密码
- mysql -uroot -p
- # 这里输入的是上边初始的默认密码
- mysql>set password=password('root');
- # 创建用户
- mysql>use mysql;
- mysql>CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
- mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- mysql>FLUSH PRIVILEGES;
- # 创建数据库
- mysql>create database canal_test character set utf8mb4 collate utf8mb4_bin;
- mysql>use canal_test;
- # 创建表
- mysql>CREATE TABLE open_user
- (
- id bigint(21) not null auto_increment,
- user_name varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,
- sex int(11) DEFAULT 1,
- portrait varchar(255) NOT NULL,
- create_time datetime DEFAULT NULL,
- update_time datetime DEFAULT NULL,
- primary key (id)
- );
- # 退出
- mysql>exit;
- # 创建 canal用户
- useradd -m canal
- # 设置密码
- passwd canal
- # 密码为 canal,输入两遍即可
- su canal
- cd ~
- mkdir canal-deployer
- # 将 canal.deployer-1.1.7.tar.gz 移至 canal-deployer
- mv /youpath/canal.deployer-1.1.7.tar.gz /home/canal/canal-deployer
- cd /home/canal/canal-deployer
- tar -zxvf canal.deployer-1.1.7.tar.gz
- cd conf/example
- vi instance.properties
- # 修改如下配置
- canal.instance.master.address=127.0.0.1:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- # 保存
- # 进入到启动命令目录下
- cd /home/canal/canal-deployer/bin
- sh ./startup.sh
- # 查看日志
- tail -f /home/canal/canal-deployer/logs/example/example.log

- # 接上
- cd ~
- mkdir canal-adapter
- # 将 canal.adapter-1.1.7.tar.gz 移至 canal-adapter
- mv /youpath/canal.adapter-1.1.7.tar.gz /home/canal/canal-adapter
- cd canal-adapter
- tar -zxvf canal.adapter-1.1.7.tar.gz
- # 修改配置文件为如下
- server:
- port: 8081
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- default-property-inclusion: non_null
-
- canal.conf:
- mode: tcp #tcp kafka rocketMQ rabbitMQ
- flatMessage: true
- zookeeperHosts:
- syncBatchSize: 1000
- retries: -1
- timeout:
- accessKey:
- secretKey:
- consumerProperties:
- # canal tcp consumer
- canal.tcp.server.host: 127.0.0.1:11111
- canal.tcp.zookeeper.hosts:
- canal.tcp.batch.size: 500
- canal.tcp.username:
- canal.tcp.password:
- srcDataSources:
- defaultDS:
- url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true&useSSL=false
- username: canal
- password: canal
- canalAdapters:
- - instance: example # canal instance Name or mq topic name
- groups:
- - groupId: g1
- outerAdapters:
- - name: logger
- - name: es7
- hosts: http://192.168.50.211:9200 # es地址
- properties:
- mode: rest # or rest
- # security.auth: test:123456 # only used for rest mode
- cluster.name: docker-cluster
处理日期格式化为 yyyy-MM-dd HH:mm:ss 需将 client-adapter.es7x-1.1.7-jar-with-dependencies.jar 替换掉 /home/canal/canal-adapter/plugin 中的 client-adapter.es7x-1.1.7-jar-with-dependencies.jar
- # 设置数据以及es映射信息
- # 进入到配置目录下的es7目录
- cd /home/canal/canal-adapter/conf/es7
- # 创建文件 open_user.yml 内容如下:
- dataSourceKey: defaultDS #此配置为application.yml 的key
- destination: example #此配置为canal的name
- groupId: g1
- esMapping:
- _index: open_user
- _type: _doc
- _id: _id
- sql: "SELECT u.id AS _id,u.user_name AS userName,u.sex,u.portrait,u.create_time as createTime,u.update_time as updateTime FROM open_user u"
- commitBatch: 3000
- # 启动
- # 进入启动目录
- cd /home/canal/canal-adapter/bin
- sh startup.sh
- tail -f /home/canal/canal-adapter/logs/adapter/adapter.log

进入kibana控制页面
打开 kibana
进入开发工具菜单

在mysql中增加记录
- INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (1, '张三', 2, '学生', '2023-11-02 16:31:21', '2023-11-02 16:39:20');
- INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (2, '李四', 1, '美术组组长', '2023-11-03 08:57:32', '2023-11-03 08:57:34');
- INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (3, '王五', 1, '班长', '2023-11-03 09:13:35', '2023-11-03 09:13:37');
- INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (4, '赵六', 1, '劳动委员', '2023-11-03 09:44:45', '2023-11-03 09:44:46');
进入canal 安装服务器 直接调用canal-adapter的Rest API:如下:
curl -X POST http://127.0.0.1:8081/etl/es7/open_user.yml
![]()
1)查看 adapter.log 日志
tail -f /home/canal/canal-adapter/logs/adapter/adapter.log
2)修改数据库 open_user 表中的数据
update open_user set user_name = '章三' where id = 1;
日志输出如下:
- 2023-11-03 16:12:02.477 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"user_name":"章三","sex":2,"portrait":"学生","create_time":1698913881000,"update_time":1698914360000}],"database":"canal_test","destination":"example","es":1698999121000,"groupId":"g1","isDdl":false,"old":[{"user_name":"张三"}],"pkNames":["id"],"sql":"","table":"open_user","ts":1698999122129,"type":"UPDATE"}
- 2023-11-03 16:12:02.477 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.support.ESSyncUtil - typeConvert valClass:class java.lang.String val:章三 esType:text
- 2023-11-03 16:12:02.483 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":1,"user_name":"章三","sex":2,"portrait":"学生","create_time":1698913881000,"update_time":1698914360000}],"database":"canal_test","destination":"example","es":1698999121000,"groupId":"g1","isDdl":false,"old":[{"user_name":"张三"}],"pkNames":["id"],"sql":"","table":"open_user","ts":1698999122129,"type":"UPDATE"}
- Affected indexes: open_user
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-data-elasticsearchartifactId>
- dependency>
- package com.example.demo.model;
-
- import com.alibaba.fastjson.annotation.JSONField;
- import com.fasterxml.jackson.annotation.JsonFormat;
- import org.springframework.data.annotation.Id;
- import org.springframework.data.elasticsearch.annotations.Document;
- import org.springframework.data.elasticsearch.annotations.Field;
- import org.springframework.data.elasticsearch.annotations.FieldType;
-
- import java.io.Serializable;
- import java.util.Date;
-
- @Document(indexName = "open_user", type = "_doc")
- public class OpenUser implements Serializable {
-
- @Id
- private String id;
-
- @Field(type = FieldType.Text)
- private String userName;
-
- @Field(type = FieldType.Text)
- private String sex;
-
- @Field(type = FieldType.Text)
- private String portrait;
-
- @Field(type = FieldType.Date)
- @JSONField(format = "yyyy-MM-dd HH:mm:ss")
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date createTime;
-
- @Field(type = FieldType.Date)
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- @JSONField(format = "yyyy-MM-dd HH:mm:ss")
- private Date updateTime;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getSex() {
- return sex;
- }
-
- public void setSex(String sex) {
- this.sex = sex;
- }
-
- public String getPortrait() {
- return portrait;
- }
-
- public void setPortrait(String portrait) {
- this.portrait = portrait;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-
- public Date getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(Date updateTime) {
- this.updateTime = updateTime;
- }
-
- @Override
- public String toString() {
- return "OpenUser{" + "id='" + id + '\'' + ", userName='" + userName + '\'' + ", sex='" + sex + '\''
- + ", portrait='" + portrait + '\'' + ", createTime=" + createTime + '\'' + ", updateTime=" + updateTime + '}';
- }
- }
- package com.example.demo.controller;
-
- import com.example.demo.model.OpenUser;
- import org.apache.commons.lang3.StringUtils;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.domain.Page;
- import org.springframework.data.domain.PageRequest;
- import org.springframework.data.domain.Pageable;
- import org.springframework.data.domain.Sort;
- import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
- import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
- import org.springframework.data.elasticsearch.core.query.SearchQuery;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- public class SearchController {
-
- @Autowired
- private ElasticsearchTemplate elasticsearchTemplate;
-
- @PostMapping("/findOpenUserByUserName")
- public Page
findOpenUserByUserName(@RequestParam(value = "userName") String userName, - @RequestParam(value = "pageNum", required = false) Integer pageNum,
- @RequestParam(value = "pageSize", required = false) Integer pageSize) {
- if (StringUtils.isBlank(userName)) {
- return null;
- }
- if (pageNum == null || pageNum < 0) {
- pageNum = 0; // if page is null, page = 0 size default 1
- }
- if (pageSize == null || pageSize < 0) {
- pageSize = 10; // if size is null, size default 10
- }
- // 分页,根据时间倒序
- Pageable pageable = PageRequest.of(pageNum, pageSize, Sort.Direction.DESC, "createTime");
- // 查询姓名
- QueryBuilder builder = null;
- if (userName.matches("^[A-Za-z0-9]+$")) {
- builder = QueryBuilders.boolQuery()
- .must(QueryBuilders.wildcardQuery("userName", ("*" + userName + "*").toLowerCase()));
- } else {
- builder = QueryBuilders.boolQuery()
- .must(QueryBuilders.matchPhraseQuery("userName", userName.toLowerCase()));
- }
- SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(builder).withPageable(pageable).build();
- return elasticsearchTemplate.queryForPage(searchQuery, OpenUser.class);
- }
- }
