• 一、Azkaban简明笔记


    1、azkaban部署

    主要是集群部署安装。

    1.1 准备安装包

    Downloads (azkaban.github.io)

    1.2 配置MySQL

    1. 启动mysql

      mysql -uroot -proot

    2. 创建azkaban数据库

      create database azkaban;

    3. 创建azkaban用户并赋予权限(可以不设置账号,继续使用root账号)

      1. -- 显示相关变量
      2. SHOW VARIABLES like 'validate_password%';
      3. -- 设置密码有效长度1位及以上
      4. set global validate_password.length=1;
      5. -- 设置密码策略最低级别
      6. set global validate_password_policy=0;
      7. -- 创建Azkaban用户,任何主机都可以访问Azkaban,密码是azkaban
      8. CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban';
      9. -- 赋予Azkaban用户增删改查权限
      10. GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
    4. 创建azkaban的表

      source /opt/software/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
    5. 更改mysql包大小,防止azkaban连接mysql阻塞

      1. sudo vim /etc/my.cnf
      2. # 在[mysqld]下面加一行max_allowed_packet=1024M
      3. [mysqld]
      4. max_allowed_packet=1024M
    6. 重启mysql

      sudo systemctl restart mysqld

    1.3 配置Executor Server

    Azkaban Executor Server处理工作流和作业的实际执行。

    1. 编辑azkaban.properties

      vim /opt/software/azkaban/azkaban-exec-server-3.84.4/conf/azkaban.properties

      修改如下属性:

      1. # 修改如下内容
      2. default.timezone.id=Asia/Shanghai
      3. azkaban.webserver.url=http://node001:8081
      4. executor.port=12321
      5. database.type=mysql
      6. mysql.port=3306
      7. mysql.host=node001
      8. mysql.database=azkaban
      9. mysql.user=azkaban
      10. mysql.password=azkaban
      11. mysql.numconnections=100
      12. # 添加如下内容
      13. executor.metric.reports=true
      14. executor.metric.milisecinterval.default=60000
    2. 进入.../azkaban-exec-server-3.84.4/lib 更新 mysql-connector-java-8.0.29.jar包,使其与mysql版本匹配

    3. 同步azkaban-exec-server-3.84.4到所有节点

    4. 在各节点的**.../azkaban-exec-server-3.84.4/ **目录分别执行

      bin/start-exec.sh

      如果出现executor.port文件,说明启动成功。

    5. 在各个节点激活executor

      1. curl -G "node001:$(<./executor.port)/executor?action=activate" && echo
      2. curl -G "node002:$(<./executor.port)/executor?action=activate" && echo
      3. curl -G "node003:$(<./executor.port)/executor?action=activate" && echo

      如果节点出现提示:{"status":"success"},表示激活成功。

    1.4 配置Web Server

    Azkaban Web Server处理项目管理,身份验证,计划和执行触发。

    1. 编辑azkaban.properties

      1. 修改如下:
      2. default.timezone.id=Asia/Shanghai
      3. database.type=mysql
      4. mysql.port=3306
      5. mysql.host=node001
      6. mysql.database=azkaban
      7. mysql.user=root
      8. mysql.password=root
      9. mysql.numconnections=100
      10. azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus

      说明:

      • StaticRemainingFlowSize:正在排队的任务数

      • CpuStatus:CPU占用情况

      • MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行

    2. 修改azkaban-users.xml文件,添加新用户

      1. <azkaban-users>
      2. <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
      3. <user password="metrics" roles="metrics" username="metrics"/>
      4. <user password="azkaban" roles="metrics,admin" username="nuochengze"/>
      5. <role name="admin" permissions="ADMIN"/>
      6. <role name="metrics" permissions="METRICS"/>
      7. azkaban-users>
    3. 进入.../azkaban-web-server-3.84.4/lib 更新 mysql-connector-java-8.0.29.jar包,使其与mysql版本匹配

    4. 进入...//azkaban-web-server-3.84.4/目录,启动web Server

      ./bin/start-web.sh
    5. 访问http://node001:8081/,并用配置的账号登录

    2、使用

    2.1 HelloWorld案例

    1. 新建azkaban.project文件,编辑内容如下:

      azkaban-flow-version: 2.0

      该文件的表明,采用新的Flow-API方式解析flow文件

    2. 新建hello_world.flow文件,编辑内容如下:

      1. nodes:
      2. - name: jobA # job的名称
      3. type: command # job的类型,command表示要执行作业的方式为命令
      4. config: # job的配置信息
      5. command: echo "hello world"
    3. 将azkaban.project、hello_world.flow文件压缩到一个zip文件

      说明:文件名称必须是英文

    4. 在WebServer新建一个项目:http://node001:8081/index

      1. 给项目名称命名和添加项目描述
      2. 将zip包文件上传
      3. 执行Execute Flow
      4. Job List中查看运行结果

    2.2 作业依赖案例

    需求:JobA和JobB执行完了,才能执行JobC

    步骤:

    1. 创建basic.flow文件,添加如下内容

      1. nodes:
      2. - name: JobC
      3. type: command
      4. dependsOn:
      5. - JobA
      6. - JobB
      7. config:
      8. command: echo "I'm JobC"
      9. - name: JobA
      10. type: command
      11. config:
      12. command: echo "I'm JobA"
      13. - name: JobB
      14. type: command
      15. config:
      16. command: echo "I'm JobB"

      dependsOn后面表示当前Job依赖的其他Job

    2. 创建azkaban.project文件,编辑内容如下:

      azkaban-flow-version: 2.0
    3. basic.flowazkaban.project文件压缩成dependson_example.zip文件

    4. 在WebServer新建一个项目:http://node001:8081/index

      1. 给项目名称命名和添加项目描述
      2. 将zip包文件上传
      3. 执行Execute Flow
      4. Job List中查看运行结果

    2.3 自动失败重试案例

    需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms

    修改basic.flow文件:

    1. nodes:
    2. - name: JobA
    3. type: command
    4. config:
    5. command: sh /not_exists.sh
    6. retries: 3
    7. retry.backoff: 10000

    参数说明:

    • retries:重试次数
    • retry.backoff: 重试的时间间隔

    执行的次数= 一次失败+三次重试

    全局配置的方式:

    1. # 在Flow全局配置中添加任务失败配置,此时重试配置会应用到所有Job
    2. config:
    3. retires: 3
    4. retry.backoff: 10000
    5. nodes:
    6. - name: JobA
    7. type: command
    8. config:
    9. command: sh /not_exists.sh

    2.4 手动失败重试案例

    Enable 和 Disable 下面都分别有如下参数:

    • Parents:该作业的上一个任务

    • Ancestors:该作业前的所有任务

    • Children:该作业后的一个任务

    • Descendents:该作业后的所有任务

    • Enable All:所有的任务

    2.5 JavaProcess作业类型案例

    JavaProcess 类型可以运行一个自定义主类方法,

    type 类型为 javaprocess,

    可用的配置为:

    • Xms:最小堆
    • Xmx:最大堆
    • classpath:类路径
    • java.class:要运行的 Java 对象,其中必须包含 Main 方法
    • main.args:main 方法的参数

    步骤:

    1. 新建一个azkaban的maven工程

    2. 创建包名:com.nuochengze

    3. 创建AzTest类:

      1. package com.nuochengze
      2. public class AzTest {
      3. public static void main(String[] args){
      4. System.out.println("This is a test.");
      5. }
      6. }
    4. 打包成jar包

    5. 修改basic.flow文件

      1. nodes:
      2. - name: test_java
      3. type: javaprocess
      4. config:
      5. Xms: 96M
      6. Xmx: 200M
      7. java.class: com.nuochengze.AzTest
    6. 将jar包和basic.flow文件及azkaban.project文件打包成zip包

    2.6 条件工作流案例

    条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。

    运行时参数案例:

    1. 基本原理

      • 父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件

      • 子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件

    2. 支持的条件运算符

      • == 等于
      • != 不等于
      • > 大于
      • >= 大于等于
      • < 小于
      • <= 小于等于
      • && 与
      • || 或
      • ! 非

    案例:

    • 需求:JobA执行一个shell脚本,JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行

    • 步骤:

      1. 新建jobA.sh

        1. #! /bin/bash
        2. echo "do JobA"
        3. wk=`date +%w`
        4. echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
      2. 新建JobB.sh

        1. #!/bin/bash
        2. echo "do JobB"
      3. 新建basic.flow

        1. nodes:
        2. - name: JobA
        3. type: command
        4. config:
        5. command: sh JobA.sh
        6. - name: JobB
        7. type: command
        8. dependsOn:
        9. - JobA
        10. config:
        11. command: sh JobB.sh
        12. condition: ${JobA:wk} == 1

        按照设定条件,JobB会根据当日日期决定是否执行

    2.7 预定义宏案例

    Azkaban 中预置了几个特殊的判断条件,称为预定义宏。 预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。

    可用的预定义宏如 下:

    • all_success: 表示父 Job 全部成功才执行(默认)
    • all_done:表示父 Job 全部完成才执行
    • all_failed:表示父 Job 全部失败才执行
    • one_success:表示父 Job 至少一个成功才执行
    • one_failed:表示父 Job 至少一个失败才执行

    案例:

    • 需求

      JobA 执行一个 shell 脚本 ,JobB 执行一个 shell 脚本,JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行

    • 步骤

      1. 新建JobA.sh

        1. #!/bin/bash
        2. echo "do JobA"
      2. 新建JobC.sh

        1. #!/bin/bash
        2. echo "do JobC
      3. 新建basic.flow

        1. nodes:
        2. - name: JobA
        3. type: command
        4. config:
        5. command: sh JobA.sh
        6. - name: JobB
        7. type: command
        8. config:
        9. command: sh JobB.sh
        10. - name: JobC
        11. type: command
        12. depondsOn:
        13. - JobA
        14. - JobB
        15. config:
        16. command: sh JobC.sh
        17. condition: one_success

        注意:没有JobB.sh

    2.8 定时执行

    • Azkaban 可以定时执行工作流。在执行工作流时候,选择左下角 Schedule
    • 右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和 crontab 配置定时 任务规则一致
    • 点击 remove Schedule 即可删除当前任务的调度规则

    2.9 邮箱报警案例

    1. 注册邮箱并开启smtp,获取第三方客户端授权码

    2. Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:

      1. 在 azkaban-web 节 点 node001上,编辑.../azkaban-web-server-3.84.4/conf/azkaban.properties,并修改如下内容:

        1. #这里设置邮件发送服务器
        2. mail.sender=xxxx@126.com
        3. mail.host=smtp.126.com
        4. mail.user=xxxx@126.com
        5. mail.password=用邮箱的授权码
      2. 保存并重启web-server

      3. 页面配置

    2.10 Azkaban多Executor模式注意事项

    Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。

    为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。

    • 方案一:指定特定的 Executor(hadoop102)去执行任务。

      1. 在 MySQL 中 azkaban 数据库 executors 表中,查询 node001上的 Executor 的 id。

      2. 在执行工作流程时加入 useExecutor 属性,如下

    • 方案二:在 Executor 所在所有节点部署任务所需脚本和应用。
  • 相关阅读:
    原生安装maven和java
    某环保制造企业核心人才培养项目成功案例纪实
    cURL 工具使用
    go slice切片的详细知识(包含底层扩容)——2
    css 实现文字流光效果
    【MySQL】专栏合集,从基础概念到调优
    G. Good Key, Bad Key(思维)
    点云绪论(点云数据及获取、点云数据处理、常用软件及开源库)
    LM小型可编程控制器软件(基于CoDeSys)笔记二十:plc通过驱动器控制步进电机
    【 网络常见的 9 大命令,非常实用!】
  • 原文地址:https://blog.csdn.net/Norni/article/details/126654304