• canal+es+kibana+springboot


    1、环境准备

    服务器:Centos7

    Jdk版本:1.8

    Mysql版本:5.7.44

    Canal版本:1.17

    Es版本:7.12.1

    kibana版本:7.12.1

    软件包下载地址:链接:https://pan.baidu.com/s/1jRpCJP0-hr9aIghC2ZbS4g 提取码:zzzz

    IP地址安装软件
    192.168.50.210Mysql,Canal
    192.168.50.211Es,Kibana

    2、安装es以及kibana

    2.1 安装docker
    1. #设置源
    2. wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
    3. # 安装依赖
    4. yum install -y yum-utils device-mapper-persistent-data lvm2
    5. # 安装docker
    6. yum install -y docker-ce
    7. # 检查安装
    8. docker -v
    9. # 启动
    10. systemctl start docker

    设置容器镜像加速地址 登录 阿里云容器镜像服务 进入到 镜像工具 -> 镜像加速器

    1. # https://xxxxxxx.mirror.aliyuncs.com 替换成你的地址 !!!!!!!!!!!!
    2. sudo mkdir -p /etc/docker
    3. sudo tee /etc/docker/daemon.json <<-'EOF'
    4. {
    5.  "registry-mirrors": ["https://xxxxxxx.mirror.aliyuncs.com"]
    6. }
    7. EOF
    8. sudo systemctl daemon-reload
    9. sudo systemctl restart docker

    设置开机启动

    systemctl enable docker.service
    2.2 安装es
    1. # 创建容器网络 es-net
    2. docker network create es-net
    3. # docker 安装 es
    4. docker run -d \
    5. --name es \
    6.    -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
    7.    -e "discovery.type=single-node" \
    8.    -v es-data:/usr/share/elasticsearch/data \
    9.    -v es-plugins:/usr/share/elasticsearch/plugins \
    10.    --privileged \
    11.    --network es-net \
    12.    -p 9200:9200 \
    13.    -p 9300:9300 \
    14. elasticsearch:7.12.1
    15. # 开通端口
    16. sudo iptables -A INPUT -p tcp --dport 9200 -j ACCEPT
    17. sudo iptables -A INPUT -p tcp --dport 9300 -j ACCEPT
    2.3 安装kibana
    1. # docker 安装 kibana
    2. docker run -d \
    3. --name kibana \
    4. -e ELASTICSEARCH_HOSTS=http://es:9200 \
    5. -e "I18N_LOCALE=zh-CN" \
    6. --network=es-net \
    7. -p 5601:5601 \
    8. kibana:7.12.1
    9. # 开通端口
    10. sudo iptables -A INPUT -p tcp --dport 5601 -j ACCEPT

    3、 安装mysql以及canel

    3.1 安装Jdk
    1. # 创建jdk安装路径
    2. mkdir -p /opt/java
    3. #将 jdk-8u301-linux-x64.tar.gz 放置 /opt/java
    4. mv /youpath/jdk-8u301-linux-x64.tar.gz /opt/java
    5. # 解压
    6. cd /opt/java
    7. tar -zxvf jdk-8u301-linux-x64.tar.gz
    8. # 添加环境变量
    9. vi /etc/profile
    10. # 加入如下片段
    11. JAVA_HOME=/opt/java/jdk1.8.0_301
    12. JRE_HOME=/opt/java/jdk1.8.0_301/jre
    13. PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    14. CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    15. export JAVA_HOME JRE_HOME PATH CLASSPATH
    16. # 保存
    17. # 刷新环境变量
    18. source /etc/profile
    19. # 检查
    20. java -version
    3.2 安装Mysql
    1. # 将安装包 mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz 放入/opt下
    2. cd /opt
    3. tar zxvf mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz
    4. mv mysql-5.7.44-linux-glibc2.12-x86_64 mysql
    5. # 删除安装包
    6. rm mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz
    7. # 添加环境变量
    8. vi /etc/profile
    9. # 加入如下代码段
    10. export PATH=/opt/mysql/bin:$PATH
    11. # 刷新环境变量
    12. source /etc/profile
    13. # 创建数据目录
    14. mkdir -p /opt/mysql/data
    15. # 创建用户 mysql
    16. useradd -m mysql
    17. # 将/opt/mysql 权限给到mysql用户
    18. chown -R mysql:mysql /home/mysql/mysql-5.7.44
    19. # 切换用户
    20. su mysql
    21. # 初始化mysql
    22. mysqld --initialize  --user=mysql --basedir=/opt/mysql --datadir=/opt/mysql/data

    如下:记录初始密码,下边要用到

    1. # 编写配置文件
    2. vi /etc/my.cnf
    3. # 新增或者修改参数如下
    4. [mysqld]
    5. symbolic-links=0 # 禁用软连接
    6. user=mysql # 用户
    7. basedir=/opt/mysql
    8. datadir=/opt/mysql/data
    9. socket=/tmp/mysql.sock
    10. lower_case_table_names=1
    11. server-id=1
    12. port=3306
    13. log-bin=/opt/mysql/mysql-bin
    14. binlog-format=ROW
    15. expire-logs-days=15
    1. #复制启动脚本
    2. cp /opt/mysql/support-files/mysql.server /etc/init.d/mysqld
    3. #启动mysql
    4. /etc/init.d/mysqld start
    1. # 修改数据库密码
    2. mysql -uroot -p
    3. # 这里输入的是上边初始的默认密码
    4. mysql>set password=password('root');
    5. # 创建用户
    6. mysql>use mysql;
    7. mysql>CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    8. mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    9. mysql>FLUSH PRIVILEGES;
    10. # 创建数据库
    11. mysql>create database canal_test character set utf8mb4 collate utf8mb4_bin;
    12. mysql>use canal_test;
    13. # 创建表
    14. mysql>CREATE TABLE open_user
    15. (
    16. id bigint(21) not null auto_increment,
    17. user_name varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,
    18. sex int(11) DEFAULT 1,
    19. portrait varchar(255) NOT NULL,
    20. create_time datetime DEFAULT NULL,
    21. update_time datetime DEFAULT NULL,
    22. primary key (id)
    23. );
    24. # 退出
    25. mysql>exit;
    3.3 安装canal.deployer
    1. # 创建 canal用户
    2. useradd -m canal
    3. # 设置密码
    4. passwd canal
    5. # 密码为 canal,输入两遍即可
    6. su canal
    7. cd ~
    8. mkdir canal-deployer
    9. # 将 canal.deployer-1.1.7.tar.gz 移至 canal-deployer
    10. mv /youpath/canal.deployer-1.1.7.tar.gz /home/canal/canal-deployer
    11. cd /home/canal/canal-deployer
    12. tar -zxvf canal.deployer-1.1.7.tar.gz
    13. cd conf/example
    14. vi instance.properties
    15. # 修改如下配置
    16. canal.instance.master.address=127.0.0.1:3306
    17. canal.instance.dbUsername=canal
    18. canal.instance.dbPassword=canal
    19. # 保存
    20. # 进入到启动命令目录下
    21. cd /home/canal/canal-deployer/bin
    22. sh ./startup.sh
    23. # 查看日志
    24. tail -f /home/canal/canal-deployer/logs/example/example.log

    3.4 安装canal.adapter
    1. # 接上
    2. cd ~
    3. mkdir canal-adapter
    4. # 将 canal.adapter-1.1.7.tar.gz 移至 canal-adapter
    5. mv /youpath/canal.adapter-1.1.7.tar.gz /home/canal/canal-adapter
    6. cd canal-adapter
    7. tar -zxvf canal.adapter-1.1.7.tar.gz
    8. # 修改配置文件为如下
    1. server:
    2. port: 8081
    3. spring:
    4. jackson:
    5. date-format: yyyy-MM-dd HH:mm:ss
    6. time-zone: GMT+8
    7. default-property-inclusion: non_null
    8. canal.conf:
    9. mode: tcp #tcp kafka rocketMQ rabbitMQ
    10. flatMessage: true
    11. zookeeperHosts:
    12. syncBatchSize: 1000
    13. retries: -1
    14. timeout:
    15. accessKey:
    16. secretKey:
    17. consumerProperties:
    18. # canal tcp consumer
    19. canal.tcp.server.host: 127.0.0.1:11111
    20. canal.tcp.zookeeper.hosts:
    21. canal.tcp.batch.size: 500
    22. canal.tcp.username:
    23. canal.tcp.password:
    24. srcDataSources:
    25. defaultDS:
    26. url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true&useSSL=false
    27. username: canal
    28. password: canal
    29. canalAdapters:
    30. - instance: example # canal instance Name or mq topic name
    31. groups:
    32. - groupId: g1
    33. outerAdapters:
    34. - name: logger
    35. - name: es7
    36. hosts: http://192.168.50.211:9200 # es地址
    37. properties:
    38. mode: rest # or rest
    39. # security.auth: test:123456 # only used for rest mode
    40. cluster.name: docker-cluster

    处理日期格式化为 yyyy-MM-dd HH:mm:ss 需将 client-adapter.es7x-1.1.7-jar-with-dependencies.jar 替换掉 /home/canal/canal-adapter/plugin 中的 client-adapter.es7x-1.1.7-jar-with-dependencies.jar

    1. # 设置数据以及es映射信息
    2. # 进入到配置目录下的es7目录
    3. cd /home/canal/canal-adapter/conf/es7
    4. # 创建文件 open_user.yml 内容如下:
    1. dataSourceKey: defaultDS #此配置为application.yml 的key
    2. destination: example #此配置为canal的name
    3. groupId: g1
    4. esMapping:
    5. _index: open_user
    6. _type: _doc
    7. _id: _id
    8. sql: "SELECT u.id AS _id,u.user_name AS userName,u.sex,u.portrait,u.create_time as createTime,u.update_time as updateTime FROM open_user u"
    9. commitBatch: 3000
    1. # 启动
    2. # 进入启动目录
    3. cd /home/canal/canal-adapter/bin
    4. sh startup.sh
    5. tail -f /home/canal/canal-adapter/logs/adapter/adapter.log

    3.5 初始数据
    3.5.1 创建索引

    进入kibana控制页面

    打开 kibana

    地址:http://192.168.50.211:5601

    进入开发工具菜单

    3.5.2 初始化Mysql数据

    在mysql中增加记录

    1. INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (1, '张三', 2, '学生', '2023-11-02 16:31:21', '2023-11-02 16:39:20');
    2. INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (2, '李四', 1, '美术组组长', '2023-11-03 08:57:32', '2023-11-03 08:57:34');
    3. INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (3, '王五', 1, '班长', '2023-11-03 09:13:35', '2023-11-03 09:13:37');
    4. INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (4, '赵六', 1, '劳动委员', '2023-11-03 09:44:45', '2023-11-03 09:44:46');
    3.5.3 初始化ES数据

    进入canal 安装服务器 直接调用canal-adapter的Rest API:如下:

    curl -X POST http://127.0.0.1:8081/etl/es7/open_user.yml

    3.5.4 测试修改数据

    1)查看 adapter.log 日志

     tail -f /home/canal/canal-adapter/logs/adapter/adapter.log

    2)修改数据库 open_user 表中的数据

    update open_user set user_name = '章三' where id = 1;

    日志输出如下:

    1. 2023-11-03 16:12:02.477 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"user_name":"章三","sex":2,"portrait":"学生","create_time":1698913881000,"update_time":1698914360000}],"database":"canal_test","destination":"example","es":1698999121000,"groupId":"g1","isDdl":false,"old":[{"user_name":"张三"}],"pkNames":["id"],"sql":"","table":"open_user","ts":1698999122129,"type":"UPDATE"}
    2. 2023-11-03 16:12:02.477 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.support.ESSyncUtil - typeConvert valClass:class java.lang.String val:章三 esType:text
    3. 2023-11-03 16:12:02.483 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":1,"user_name":"章三","sex":2,"portrait":"学生","create_time":1698913881000,"update_time":1698914360000}],"database":"canal_test","destination":"example","es":1698999121000,"groupId":"g1","isDdl":false,"old":[{"user_name":"张三"}],"pkNames":["id"],"sql":"","table":"open_user","ts":1698999122129,"type":"UPDATE"}
    4. Affected indexes: open_user

    4、Spring-boot集成

    4.1 创建springboot工程引入如下依赖
    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-data-elasticsearchartifactId>
    4. dependency>
    4.2 创建实体类
    1. package com.example.demo.model;
    2. import com.alibaba.fastjson.annotation.JSONField;
    3. import com.fasterxml.jackson.annotation.JsonFormat;
    4. import org.springframework.data.annotation.Id;
    5. import org.springframework.data.elasticsearch.annotations.Document;
    6. import org.springframework.data.elasticsearch.annotations.Field;
    7. import org.springframework.data.elasticsearch.annotations.FieldType;
    8. import java.io.Serializable;
    9. import java.util.Date;
    10. @Document(indexName = "open_user", type = "_doc")
    11. public class OpenUser implements Serializable {
    12. @Id
    13. private String id;
    14. @Field(type = FieldType.Text)
    15. private String userName;
    16. @Field(type = FieldType.Text)
    17. private String sex;
    18. @Field(type = FieldType.Text)
    19. private String portrait;
    20. @Field(type = FieldType.Date)
    21. @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    22. @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    23. private Date createTime;
    24. @Field(type = FieldType.Date)
    25. @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    26. @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    27. private Date updateTime;
    28. public String getId() {
    29. return id;
    30. }
    31. public void setId(String id) {
    32. this.id = id;
    33. }
    34. public String getUserName() {
    35. return userName;
    36. }
    37. public void setUserName(String userName) {
    38. this.userName = userName;
    39. }
    40. public String getSex() {
    41. return sex;
    42. }
    43. public void setSex(String sex) {
    44. this.sex = sex;
    45. }
    46. public String getPortrait() {
    47. return portrait;
    48. }
    49. public void setPortrait(String portrait) {
    50. this.portrait = portrait;
    51. }
    52. public Date getCreateTime() {
    53. return createTime;
    54. }
    55. public void setCreateTime(Date createTime) {
    56. this.createTime = createTime;
    57. }
    58. public Date getUpdateTime() {
    59. return updateTime;
    60. }
    61. public void setUpdateTime(Date updateTime) {
    62. this.updateTime = updateTime;
    63. }
    64. @Override
    65. public String toString() {
    66. return "OpenUser{" + "id='" + id + '\'' + ", userName='" + userName + '\'' + ", sex='" + sex + '\''
    67. + ", portrait='" + portrait + '\'' + ", createTime=" + createTime + '\'' + ", updateTime=" + updateTime + '}';
    68. }
    69. }
    4.3 创建接口
    1. package com.example.demo.controller;
    2. import com.example.demo.model.OpenUser;
    3. import org.apache.commons.lang3.StringUtils;
    4. import org.elasticsearch.index.query.QueryBuilder;
    5. import org.elasticsearch.index.query.QueryBuilders;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.data.domain.Page;
    8. import org.springframework.data.domain.PageRequest;
    9. import org.springframework.data.domain.Pageable;
    10. import org.springframework.data.domain.Sort;
    11. import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
    12. import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
    13. import org.springframework.data.elasticsearch.core.query.SearchQuery;
    14. import org.springframework.web.bind.annotation.PostMapping;
    15. import org.springframework.web.bind.annotation.RequestParam;
    16. import org.springframework.web.bind.annotation.RestController;
    17. @RestController
    18. public class SearchController {
    19. @Autowired
    20. private ElasticsearchTemplate elasticsearchTemplate;
    21. @PostMapping("/findOpenUserByUserName")
    22. public Page findOpenUserByUserName(@RequestParam(value = "userName") String userName,
    23. @RequestParam(value = "pageNum", required = false) Integer pageNum,
    24. @RequestParam(value = "pageSize", required = false) Integer pageSize) {
    25. if (StringUtils.isBlank(userName)) {
    26. return null;
    27. }
    28. if (pageNum == null || pageNum < 0) {
    29. pageNum = 0; // if page is null, page = 0 size default 1
    30. }
    31. if (pageSize == null || pageSize < 0) {
    32. pageSize = 10; // if size is null, size default 10
    33. }
    34. // 分页,根据时间倒序
    35. Pageable pageable = PageRequest.of(pageNum, pageSize, Sort.Direction.DESC, "createTime");
    36. // 查询姓名
    37. QueryBuilder builder = null;
    38. if (userName.matches("^[A-Za-z0-9]+$")) {
    39. builder = QueryBuilders.boolQuery()
    40. .must(QueryBuilders.wildcardQuery("userName", ("*" + userName + "*").toLowerCase()));
    41. } else {
    42. builder = QueryBuilders.boolQuery()
    43. .must(QueryBuilders.matchPhraseQuery("userName", userName.toLowerCase()));
    44. }
    45. SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(builder).withPageable(pageable).build();
    46. return elasticsearchTemplate.queryForPage(searchQuery, OpenUser.class);
    47. }
    48. }
    4.4 测试

  • 相关阅读:
    k8s1.24 部署springboot项目
    Java架构师基础框架设计
    Python图像和视频上传
    Android 12.0 系统wifi列表显示已连接但无法访问网络问题解决
    【PyTorch】MSELoss的详细理解(含源代码)
    手工编译安装nginx
    MySQL 与 PostgreSQL的区别
    【C++】不能两次杀死同一条鱼 - 浅述shared_ptr智能指针的使用方法
    计算机网络 第一章计算机网络体系结构
    【机器学习周志华】读书笔记 P3 机器学习发展历程(选读)
  • 原文地址:https://blog.csdn.net/weixin_38405770/article/details/134217814