• Doris从理论详解到千万级数据量场景使用


    Doris测试与实战

    命令行操作

    1. ```shell
    2. ### wm_doris01上启动fe
    3. [doris01]$ sh /opt/module/doris/doris0.12.21/fe/bin/start_fe.sh --daemon
    4. [doris01]$ jps
    5. 19929Jps
    6. 19871PaloFe
    7. [doris02]$ sh /opt/module/doris/doris0.12.21/fe/bin/start_fe.sh --helper doris01:9010--daemon
    8. [doris03]$ sh /opt/module/doris/doris0.12.21/fe/bin/start_fe.sh --helper doris01:9010--daemon
    9. ### wm_doris01,wm_doris02,wm_doris03上启动be
    10. [doris01]$ sh /opt/module/doris/doris0.12.21/be/bin/start_be.sh --daemon
    11. [doris02]$ sh /opt/module/doris/doris0.12.21/be/bin/start_be.sh --daemon
    12. [doris03]$ sh /opt/module/doris/doris0.12.21/be/bin/start_be.sh --daemon
    13. [doris01]$ mysql -hdoris01 -P 9030 -uroot
    14. mysql>SET PASSWORD FOR 'root' =PASSWORD('niaoshu123456');
    15. mysql> exit;
    16. mysql -hdoris01 -P 9030 -uroot -niaoshu123456
    17. mysql> alter system add backend "doris01:9050";
    18. mysql> alter system add backend "doris02:9050";
    19. mysql> alter system add backend "doris03:9050";
    20. mysql> alter system add broker niaoshu_broker_hdfs "doris01:8000","doris02:8000","doris03:8000";
    21. mysql> alter system add follower "doris02:9010";
    22. mysql> alter system add observer "doris03:9010";
    23. ### 从节点首次启动
    24. sh /opt/module/doris/doris0.12.21/fe/bin/start_fe.sh --helper doris01:9010 --daemon[节点2,节点3]
    25. mysql>show proc '/backends'; ###查看Be状态
    26. mysql>show proc '/brokers'; ###查看Broker状态
    27. mysql>show proc '/frontends'; ###查看Fe状态
    28. ```

    注意

    ① 增加FE节点,FE分为Leader,Follower和Observer三种角色。默认一个集群只能有一个Leader,可以有多个Follower和Observer.其中Leader和Follower组成一个Paxos选择组,如果Leader宕机,则剩下的Follower会成为Leader,保证HA。Observer是负责同步Leader数据的不参与选举。如果只部署一个FE,则FE默认就是Leader。

    ② 在doris02再部署一台FE,doris03上部署Observer。–helper参数指定leader地址和端口号。

    ③ 增加BE节点,就像上面安装一样在mysql客户端,使用ALTER SYSTEM ADD BACKEND语句即可,相对应删除BE节点,使用==ALTER SYSTEM DECOMMISSION BACKEND”
    be_host:be_heartbeat_service_port”。

    1. ```shell
    2. ###删除节点
    3. mysql> alter system drop follower "hadoop01:9010";
    4. mysql> alter system drop observer "hadoop01:9010";
    5. ## 删除对应的 FE 节点:
    6. [ALTER SYSTEM DROP FOLLOWER[OBSERVER] "fe_host:edit_log_port";]
    7. ### 提示:删除 Follower FE 时,确保最终剩余的 Follower(包括 Leader)节点为奇数
    8. #### 删除 BE 节点
    9. mysql> ALTER SYSTEM DECOMMISSION BACKEND "hadoop01:9050"
    10. [ALTER SYSTEM DECOMMISSION BACKEND "be_host:be_heartbeat_service_port";]
    11. ### borker删除
    12. mysql> ALTER SYSTEM DROP BROKER broker_name "hadoop01:9050";
    13. [ALTER SYSTEM DROP BROKER broker_name "broker_host:broker_ipc_port";]
    14. [ALTER SYSTEM DROP ALL BROKER broker_name;]
    15. ```

    提示:ALTER SYSTEMDROP BACKEND “
    be_host:be_heartbeat_service_port”;注意:DROP BACKEND 会直接删除该 BE,并且其上的数据将不能再恢复!!!所以我们强烈不推荐使用 DROP BACKEND 这种方式删除 BE 节点。当你使用这个语句时,会有对应的防误操作提示。

    重点说明:当做HA是其他节点首次启动没有使用 –helper 的解决办法,把alive值为false的节点的doris-mete文件夹下的所有文件删除[建议留一个备份]

    然后重新使用以下命令启动:

    sh /opt/module/doris/fe/bin/start_fe.sh --helper doris01:9010 --daemon

    建表

    1. ```sql
    2. create table niaoshu.niaoshu_snap (
    3. uid int comment '用户uid',
    4. tid int comment '老师id',
    5. aid varchar(32) comment 'aid'
    6. )duplicate key(uid)
    7. distributed by hash(uid) buckets 10;
    8. create table niaoshu.niaoshu_mid(
    9. uid int COMMENT '用户uid',
    10. k_id varchar(32) COMMENT '课程id',
    11. )duplicate key(uid)
    12. distributed by hash(uid) buckets 10;
    13. ```

    流式导入数据

    1. ```sql
    2. curl --location-trusted -u root -H "label:123" -H "column_separator:,"
    3. -T naoshu.csv -X PUT http://doris01:8030/api/niaoshu/niaoshu/_stream_load
    4. curl --location-trusted -u root -H "label:niaoshu_mid1" -H "column_separator:,"
    5. -T niaoshu.csv-X PUT http://doris01:8030/api/niaoshu/niaoshu_mid/_stream_load
    6. ```

    第三方导入方式

    尖叫提示: doris中除了olap引擎,其他引擎不存储数据。

    1.直接连接mysql中的数据表,建立一样的数据模型

    1. ```sql
    2. CREATE TABLE niaoshu.niaoshu_1(
    3. par_date int,
    4. a_id varchar(32) COMMENT 'aid',
    5. b_id varchar(32) COMMENT'bid',
    6. c_id varchar(32) COMMENT'cid',
    7. d_id varchar(32) COMMENT 'did'
    8. ENGINE=mysql
    9. PROPERTIES
    10. (
    11. "host" = "192.168.10.1",
    12. "port" = "3306",
    13. "user" = "root",
    14. "password" = "abcd123456",
    15. "database" = "niaoshu",
    16. "table" = "nioashu_mid"
    17. );

    2.创建一个数据文件存储在HDFS上的 broker 外部表, 数据使用 “\t”分割,”\n” 换行

    尖叫提示:==外部表只支持csv和textfile类型==

    1. ```sql
    2. CREATE EXTERNAL TABLE niaoshu.niaoshu_2(
    3. id VARCHAR(32) COMMENT 'id',
    4. a_type VARCHAR(32) COMMENT '学生课程类型',
    5. b_name VARCHAR(32) COMMENT '学生姓名'
    6. )ENGINE=broker
    7. PROPERTIES (
    8. "broker_name" = "niaoshu",
    9. "path" ="hdfs://hadoop:8020/user/hive/warehouse/wm_ods.db/niaoshu_test/*",
    10. "column_separator" = "\t",
    11. "line_delimiter" = "\n"
    12. )
    13. BROKER PROPERTIES (
    14. "username" = "hadoop",
    15. "password" = "abcd1234"
    16. )
    17. ```

    统计hive中的数据;

    以使用hive的外部表查找统计21条数据用时 1.78s。

    3.使用brokers方式把数据导入doris中

    建立动态分区表

    1. ```sql
    2. create table niaoshu.niaoshu3(
    3. uid int COMMENT '用户id',
    4. name varchar(526) COMMENT '昵称',
    5. phone varchar(32) COMMENT '手机号'
    6. )ENGINE=olap
    7. DUPLICATE KEY(par_date,uid)
    8. COMMENT 'hdfs中的数据导入doris-test'
    9. PARTITION BY RANGE(par_date)()
    10. DISTRIBUTED BY HASH(uid) BUCKETS 9
    11. PROPERTIES(
    12. "storage_medium" = "SSD",
    13. "dynamic_partition.time_unit" = "DAY",
    14. "dynamic_partition.end" = "7",
    15. "dynamic_partition.prefix" = "p",
    16. "dynamic_partition.buckets" = "64"
    17. );
    18. -- "dynamic_partition.start" = "-700",
    19. ```

    尖叫提示:==属性中的字段必须发在建表中的最前面==

    查看动态分区:

    SHOW DYNAMIC PARTITION TABLES;

    1.展示指定db下指定表的所有分区信息

    SHOW PARTITIONS FROMexample_db.table_name;

    2.展示指定db下指定表的指定分区的信息

    SHOW PARTITIONS FROM example_db.table_nameWHERE PartitionName = "p1";

    3.手动添加分区,需要先关闭动态分区:

    ALTER TABLE dw_t_xby_wxuser_snap  SET ("dynamic_partition.enable" ="false")

    否则报错:Key columnsshould be a ordered prefix of the schema

    分桶的字段必须是三个模型中的key。ex:DUPLICATE KEY

    从hdfs 导入doris中, 6228517 ≈622W+ 用时2分钟左右,

    plan_fragment_executor.cpp:555]Fragment 30b2646b7088479f-bd4d793a9b9dda65:(Active: 4s576ms, % non-child:0.00%)

    – AverageThreadTokens: 1.00

    – BytesReceived: 6.16 MB

    – DeserializeRowBatchTimer: 21.636ms

    – FirstBatchArrivalWaitTime: 0.000ns

    – PeakReservation: 0

    – PeakUsedReservation: 0

    – RowsProduced: 140.09K

    – SendersBlockedTimer: 0.000ns

    – SendersBlockedTotalTimer(*): 0.000ns

    BlockMgr:

    – BlockWritesOutstanding: 0

    – BlocksCreated: 0

    – BlocksRecycled: 0

    – BufferedPins: 0

    – BytesWritten: 0

    – MaxBlockSize: 8.00 MB

    – MemoryLimit: 2.00 GB

    – TotalBufferWaitTime: 0.000ns

    – TotalEncryptionTime: 0.000ns

    – TotalIntegrityCheckTime: 0.000ns

    – TotalReadBlockTime: 0.000ns

    DataBufferSender(dst_fragment_instance_id=30b2646b7088479f-bd4d793a9b9dda65):

    EXCHANGE_NODE (id=8):(Active: 4s501ms, % non-child: 0.00%)

    – ConvertRowBatchTime: 1.920ms

    – MemoryUsed: 0

    – RowsReturned: 140.09K

    – RowsReturnedRate: 31.12 K/sec

    使用show load查看load 任务,帮助命令:HELP SHOW LOAD;

    定时每天导入hdfs数据进入doris中

    1. ```sql
    2. #!/bin/sh
    3. ### 每日执行生成唯一标签
    4. label_date=niaoshu_`date +%Y%m%d%H%M%S`
    5. export MYSQL_PWD='niaoshu123456'
    6. ### 用户拼接hadoop中的数据路径
    7. dt=`date -d '-1 day' +%Y%m%d`
    8. ### 用户写入doris中分区的时间
    9. par_date=`date -d '-1 day' +%Y-%m-%d`
    10. mysql -uroot -h doris01 -P 9030 -e"\
    11. use wm_dws_db;
    12. LOAD LABEL wm_dws_db.$label_date
    13. (
    14. DATA INFILE('hdfs://h_cluseter/user/hive/warehouse/naioshu.db/niaoshu3/par_date=$dt/*')
    15. INTO TABLE niaoshu3
    16. FORMAT AS 'orc'
    17. (userid,name,phone)
    18. SET(par_date=DATE_ADD(curdate(),interval -1day),uid=userid,name=name,phone=phone)
    19. )
    20. WITH BROKER 'niaoshu_broker_hdfs'
    21. (
    22. 'username'='niaoshu',
    23. 'password'='niaoshu123456',
    24. 'dfs.nameservices'='h_cluseter',
    25. 'dfs.ha.namenodes.emr-cluster'='nn1,nn2',
    26. 'dfs.namenode.rpc-address.emr-cluster.nn1'='hadoop01:8020',
    27. 'dfs.namenode.rpc-address.emr-cluster.nn2'='hadoop02:8020',
    28. 'dfs.client.failover.proxy.provider'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
    29. )
    30. PROPERTIES
    31. (
    32. 'timeout'='3600',
    33. 'max_filter_ratio'='0.001'
    34. );
    35. "
    36. ```

    查看具体的执行效果

    1. $ mysql -hdoris01 -P 9030 -uroot-pniaoshu123456
    2. mysql> use wm_dws_db
    3. mysql> SHOW LOAD;

    4.从kafka中导入数据进入到doris

    ①创建表

    1. ```sql
    2. CREATE TABLE niaoshu.niaoshu4(
    3. par_date date COMMENT '触发日期',
    4. uid bigint COMMENT '游客默认0',
    5. a_name varchar(32) COMMENT '模块名',
    6. b_time datetime COMMENT '触发时间',
    7. c_os varchar(32) COMMENT '操作系统',
    8. c_ip varchar(32) COMMENT '客户端地址'
    9. )
    10. DUPLICATE KEY(par_date,uid,a_name)
    11. PARTITION BY RANGE(par_date)()
    12. DISTRIBUTED BY HASH(par_date,uid,a_name) BUCKETS 32
    13. PROPERTIES(
    14. "storage_medium" = "SSD",
    15. "dynamic_partition.enable" = "true",
    16. "dynamic_partition.time_unit" = "DAY",
    17. "dynamic_partition.end" = "3",
    18. "dynamic_partition.prefix" = "p",
    19. "dynamic_partition.buckets" ="12",
    20. "replication_num" = "3"
    21. );
    22. ```

    ②创建导入任务

    1. ```sql
    2. CREATE ROUTINE LOADniaoshu.niaoshu04002 ON niaoshu04
    3. COLUMNS(p_tmp,uid,a_name,b_time,c_os,c_ip,par_date=from_unixtime(cast(p_tmpas int),'%Y-%m-%d'),event_time=from_unixtime(cast(p_tmp as int))),
    4. PROPERTIES
    5. (
    6. "desired_concurrent_number"="24",
    7. "strict_mode"= "false",
    8. "max_batch_interval"= "20",
    9. "max_batch_rows" = "300000",
    10. "max_batch_size" = "209715200",
    11. "format" = "json",
    12. "jsonpaths"="[\"$.date_time\",\"$.uid\",\"$.a_name\",\"$.b_time\",\"$.c_os\",\"$.c_ip\"]"
    13. )
    14. FROM KAFKA
    15. (
    16. "kafka_broker_list"="hadoop01:9092,hadoop02:9092,hadoop03:9092",
    17. "kafka_topic" ="testjson",
    18. "property.group.id"="niaoshu_data_group_1",
    19. "property.kafka_default_offsets" = "OFFSET_END",
    20. "property.enable.auto.commit"="true",
    21. "property.client.id"="niaoshu_id"
    22. );
    23. ```

    查看加载

    1. ```sql
    2. SHOW ALL ROUTINE LOAD;
    3. ###具体命令和示例可以通过
    4. HELP SHOW ROUTINE LOAD TASK;
    5. HELP STOP ROUTINE LOAD;
    6. HELP PAUSE ROUTINE LOAD;
    7. HELP RESUME ROUTINE LOAD;
    8. ####修改表的字段类型
    9. ### 添加表字段
    10. alter table niaoshu add columnstudy_time varchar(64) comment '学习时间'
    11. ####重启任务
    12. RESUME ROUTINE LOAD for niaoshu04002;
    13. ###暂停任务
    14. PAUSE ROUTINE LOAD;
    15. ```

    kafka to doris 成功

    注:kafka to doris 数据表有近7000万数据的实时查询。

    Hue中集成doris,安装mysql添加方式添加即可。

  • 相关阅读:
    HTML5期末大作业【红色的电影售票平台网站】web前端 html+css+javascript网页设计实例 企业网站制作
    计算机中丢失mfc140u.dll怎么解决
    sqlcoder实践
    Jetbrains Fleet这十个快捷键,效率提高50倍
    测试人生 | 从功能到外企测开,工作1年半拿下年薪30万的测开 offer,这个95后小姐姐未来可期~
    第二部分:CSS3
    [笔记] 函数sort() #排序
    Appium自动化测试基础 — ADB常用命令(一)
    2022年全球市场文拉法辛原料药总体规模、主要生产商、主要地区、产品和应用细分研究报告
    中高级Java程序员,你不得不掌握的基本功,挑战20k+
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/126231170