1、azkaban部署
主要是集群部署安装。
1.1 准备安装包
1.2 配置MySQL
-
mysql -uroot -proot -
创建azkaban数据库
create database azkaban; -
创建azkaban用户并赋予权限(可以不设置账号,继续使用root账号)
- -- 显示相关变量
- SHOW VARIABLES like 'validate_password%';
- -- 设置密码有效长度1位及以上
- set global validate_password.length=1;
- -- 设置密码策略最低级别
- set global validate_password_policy=0;
- -- 创建Azkaban用户,任何主机都可以访问Azkaban,密码是azkaban
- CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban';
- -- 赋予Azkaban用户增删改查权限
- GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
-
创建azkaban的表
source /opt/software/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql; -
更改mysql包大小,防止azkaban连接mysql阻塞
- sudo vim /etc/my.cnf
- # 在[mysqld]下面加一行max_allowed_packet=1024M
- [mysqld]
- max_allowed_packet=1024M
-
重启mysql
sudo systemctl restart mysqld
1.3 配置Executor Server
Azkaban Executor Server处理工作流和作业的实际执行。
-
编辑azkaban.properties
vim /opt/software/azkaban/azkaban-exec-server-3.84.4/conf/azkaban.properties修改如下属性:
- # 修改如下内容
- default.timezone.id=Asia/Shanghai
- azkaban.webserver.url=http://node001:8081
- executor.port=12321
- database.type=mysql
- mysql.port=3306
- mysql.host=node001
- mysql.database=azkaban
- mysql.user=azkaban
- mysql.password=azkaban
- mysql.numconnections=100
- # 添加如下内容
- executor.metric.reports=true
- executor.metric.milisecinterval.default=60000
-
进入
.../azkaban-exec-server-3.84.4/lib更新mysql-connector-java-8.0.29.jar包,使其与mysql版本匹配 -
同步
azkaban-exec-server-3.84.4到所有节点 -
在各节点的**.../azkaban-exec-server-3.84.4/ **目录分别执行
bin/start-exec.sh如果出现executor.port文件,说明启动成功。
-
在各个节点激活executor
- curl -G "node001:$(<./executor.port)/executor?action=activate" && echo
- curl -G "node002:$(<./executor.port)/executor?action=activate" && echo
- curl -G "node003:$(<./executor.port)/executor?action=activate" && echo
如果节点出现提示:{"status":"success"},表示激活成功。
1.4 配置Web Server
Azkaban Web Server处理项目管理,身份验证,计划和执行触发。
-
编辑
azkaban.properties- 修改如下:
- default.timezone.id=Asia/Shanghai
- database.type=mysql
- mysql.port=3306
- mysql.host=node001
- mysql.database=azkaban
- mysql.user=root
- mysql.password=root
- mysql.numconnections=100
- azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
说明:
-
StaticRemainingFlowSize:正在排队的任务数
-
CpuStatus:CPU占用情况
-
MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行
-
修改
azkaban-users.xml文件,添加新用户- <azkaban-users>
- <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
- <user password="metrics" roles="metrics" username="metrics"/>
- <user password="azkaban" roles="metrics,admin" username="nuochengze"/>
- <role name="admin" permissions="ADMIN"/>
- <role name="metrics" permissions="METRICS"/>
- azkaban-users>
-
进入
.../azkaban-web-server-3.84.4/lib更新mysql-connector-java-8.0.29.jar包,使其与mysql版本匹配 -
进入
...//azkaban-web-server-3.84.4/目录,启动web Server./bin/start-web.sh -
访问
http://node001:8081/,并用配置的账号登录
2、使用
2.1 HelloWorld案例
-
新建azkaban.project文件,编辑内容如下:
azkaban-flow-version: 2.0该文件的表明,采用新的Flow-API方式解析flow文件
-
新建hello_world.flow文件,编辑内容如下:
- nodes:
- - name: jobA # job的名称
- type: command # job的类型,command表示要执行作业的方式为命令
- config: # job的配置信息
- command: echo "hello world"
-
将azkaban.project、hello_world.flow文件压缩到一个zip文件
说明:文件名称必须是英文
-
在WebServer新建一个项目:
http://node001:8081/index- 给项目名称命名和添加项目描述
- 将zip包文件上传
- 执行
Execute Flow - 在
Job List中查看运行结果
2.2 作业依赖案例
需求:JobA和JobB执行完了,才能执行JobC
步骤:
-
创建
basic.flow文件,添加如下内容- nodes:
- - name: JobC
- type: command
- dependsOn:
- - JobA
- - JobB
- config:
- command: echo "I'm JobC"
- - name: JobA
- type: command
- config:
- command: echo "I'm JobA"
- - name: JobB
- type: command
- config:
- command: echo "I'm JobB"
dependsOn后面表示当前Job依赖的其他Job
-
创建
azkaban.project文件,编辑内容如下:azkaban-flow-version: 2.0 -
将
basic.flow和azkaban.project文件压缩成dependson_example.zip文件 -
在WebServer新建一个项目:
http://node001:8081/index- 给项目名称命名和添加项目描述
- 将zip包文件上传
- 执行
Execute Flow - 在
Job List中查看运行结果
2.3 自动失败重试案例
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
修改basic.flow文件:
- nodes:
- - name: JobA
- type: command
- config:
- command: sh /not_exists.sh
- retries: 3
- retry.backoff: 10000
参数说明:
- retries:重试次数
- retry.backoff: 重试的时间间隔
执行的次数= 一次失败+三次重试
全局配置的方式:
- # 在Flow全局配置中添加任务失败配置,此时重试配置会应用到所有Job
-
- config:
- retires: 3
- retry.backoff: 10000
- nodes:
- - name: JobA
- type: command
- config:
- command: sh /not_exists.sh
2.4 手动失败重试案例
Enable 和 Disable 下面都分别有如下参数:
-
Parents:该作业的上一个任务
-
Ancestors:该作业前的所有任务
-
Children:该作业后的一个任务
-
Descendents:该作业后的所有任务
-
Enable All:所有的任务
2.5 JavaProcess作业类型案例
JavaProcess 类型可以运行一个自定义主类方法,
type 类型为 javaprocess,
可用的配置为:
- Xms:最小堆
- Xmx:最大堆
- classpath:类路径
- java.class:要运行的 Java 对象,其中必须包含 Main 方法
- main.args:main 方法的参数
步骤:
-
新建一个azkaban的maven工程
-
创建包名:com.nuochengze
-
创建AzTest类:
- package com.nuochengze
- public class AzTest {
- public static void main(String[] args){
- System.out.println("This is a test.");
- }
- }
-
打包成jar包
-
修改
basic.flow文件- nodes:
- - name: test_java
- type: javaprocess
- config:
- Xms: 96M
- Xmx: 200M
- java.class: com.nuochengze.AzTest
-
将jar包和
basic.flow文件及azkaban.project文件打包成zip包
2.6 条件工作流案例
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。
运行时参数案例:
-
基本原理
-
父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件
-
子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件
-
-
支持的条件运算符
- == 等于
- != 不等于
- > 大于
- >= 大于等于
- < 小于
- <= 小于等于
- && 与
- || 或
- ! 非
案例:
-
需求:JobA执行一个shell脚本,JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行
-
步骤:
-
新建jobA.sh
- #! /bin/bash
- echo "do JobA"
- wk=`date +%w`
- echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
-
新建JobB.sh
- #!/bin/bash
- echo "do JobB"
-
新建basic.flow
- nodes:
- - name: JobA
- type: command
- config:
- command: sh JobA.sh
- - name: JobB
- type: command
- dependsOn:
- - JobA
- config:
- command: sh JobB.sh
- condition: ${JobA:wk} == 1
按照设定条件,JobB会根据当日日期决定是否执行
-
2.7 预定义宏案例
Azkaban 中预置了几个特殊的判断条件,称为预定义宏。 预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。
可用的预定义宏如 下:
- all_success: 表示父 Job 全部成功才执行(默认)
- all_done:表示父 Job 全部完成才执行
- all_failed:表示父 Job 全部失败才执行
- one_success:表示父 Job 至少一个成功才执行
- one_failed:表示父 Job 至少一个失败才执行
案例:
-
需求
JobA 执行一个 shell 脚本 ,JobB 执行一个 shell 脚本,JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行
-
步骤
-
新建JobA.sh
- #!/bin/bash
- echo "do JobA"
-
新建JobC.sh
- #!/bin/bash
- echo "do JobC
-
新建basic.flow
- nodes:
- - name: JobA
- type: command
- config:
- command: sh JobA.sh
- - name: JobB
- type: command
- config:
- command: sh JobB.sh
- - name: JobC
- type: command
- depondsOn:
- - JobA
- - JobB
- config:
- command: sh JobC.sh
- condition: one_success
注意:没有JobB.sh
-
2.8 定时执行
- Azkaban 可以定时执行工作流。在执行工作流时候,选择左下角 Schedule
- 右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和 crontab 配置定时 任务规则一致
- 点击 remove Schedule 即可删除当前任务的调度规则
2.9 邮箱报警案例
-
注册邮箱并开启smtp,获取第三方客户端授权码
-
Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:
-
在 azkaban-web 节 点 node001上,编辑
.../azkaban-web-server-3.84.4/conf/azkaban.properties,并修改如下内容:- #这里设置邮件发送服务器
- mail.sender=xxxx@126.com
- mail.host=smtp.126.com
- mail.user=xxxx@126.com
- mail.password=用邮箱的授权码
-
保存并重启web-server
-
页面配置
-
2.10 Azkaban多Executor模式注意事项
Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。
为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。
-
方案一:指定特定的 Executor(hadoop102)去执行任务。
-
在 MySQL 中 azkaban 数据库 executors 表中,查询 node001上的 Executor 的 id。
-
在执行工作流程时加入 useExecutor 属性,如下
-
- 方案二:在 Executor 所在所有节点部署任务所需脚本和应用。
