2 如果没有表,需要根据标签定义规则建立标签表。
每一个标签一个表
自动生成建表语句(表名、字段名、分区、格式、存储位置)
使用tag_info中的tagcode作为表名
字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】
分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量
压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算
create table tableName (uid string,tag_value tag_valueType)
comment tagName
partition by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
location '/hdfs_path/dbName/tableName'
//2 如果没有表,需要根据标签定义规则建立标签表
// 每一个标签一个表
// 自动生成建表语句(表名、字段名、分区、格式、存储位置)
// 使用tag_info中的tagcode作为表名
// 字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】
// 分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量
// 压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算
// create table tableName (uid string,tag_value tag_valueType)
// comment tagName
// partition by (dt string)
// ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
// location '/hdfs_path/dbName/tableName'
// 获取建表语句中的元素
// 获取表名
val tableName = tagInfo.tagCode.toLowerCase()
// 获取tagValueType字段类型(1整数 2浮点 3文本 4日期)
// 常量值一般不写在代码中,所以创建一个常量类,对应标题1
val tagValueType = tagInfo.tagValueType match {
case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
case ConstCode.TAG_VALUE_TYPE_STRING => "string"
case ConstCode.TAG_VALUE_TYPE_DATE => "string"
}
//config.properties文件中
//hdfs-store.path=hdfs://hadoop101:8020/user_profile:存储位置
//data-warehouse.dbname=gmall:数仓库
//user-profile.dbname=user_profile2022:画像库
val properties: Properties = MyPropertiesUtil.load("config.properties")
val hdfsPath: String = properties.getProperty("hdfs-store.path")
val dwDBName: String = properties.getProperty("data-warehouse.dbname")
val upDBName: String = properties.getProperty("user-profile.dbname")
// 创建sql
val createTableSQL =
s"""
| create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
| comment '${tagInfo.tagName}'
| partitioned by (dt string)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
| location '$hdfsPath/$upDBName/$tableName'
|""".stripMargin
println(createTableSQL)
sparkSession.sql(createTableSQL)
3 根据标签定义和规则查询数据仓库
4 把数据写入到对应的标签表中
核心:围绕task_info中的task_sql语句进行处理,查出来的数据为UFM,想插入的数据为男女未知,中间需要进行转换,使用case when
select uid, case query_value
when 'F' then '女'
when 'xx' then ''
when 'xx' then '' end as tag_value
from ($sql)
//3 根据标签定义和规则查询数据仓库
//
// select uid, case query_value
// when 'F' then '女'
// when 'xx' then ''
// when 'xx' then '' end as tag_value
// from ($sql)
// 动态生成case when语句
// 3.1 先生成case when语句
//将list中的数据转换成一个一个的when then
//将TaskTagRule(1,7,1,F,8,男), TaskTagRule(2,7,1,M,9,女), TaskTagRule(3,7,1,U,10,未知)
//转换为List( when F then 男, when M then 女, when U then 未知)
val whenThenList: List[String] = taskTagRuleList.map {
taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
}
//将List转换为字符串,按照空格进行分割
val whenThenSQL: String = whenThenList.mkString(" ")
val selectSQL =
s"""
| select uid,
| case query_value
| $whenThenSQL end as tag_value
| from (${taskInfo.taskSql}) tv
|""".stripMargin
println(selectSQL)
//4 把数据写入到对应的标签表中
//insert执行时,需要跨库操作,从数仓库写入到数仓库,需要添加库名
//画像库的库名容易添加,但数仓的sql是直接传进来的,怎么解决?
//可以定义sql在哪里执行
sparkSession.sql(s"use $dwDBName")
val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
println(insertSQL)
sparkSession.sql(insertSQL)
package com.hzy.userprofile.app
import java.util.Properties
import com.hzy.userprofile.bean.{TagInfo, TaskInfo, TaskTagRule}
import com.hzy.userprofile.constants.ConstCode
import com.hzy.userprofile.dao.{TagInfoDAO, TaskInfoDAO, TaskTagRuleDAO}
import com.hzy.userprofile.util.MyPropertiesUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object TaskSQLApp {
def main(args: Array[String]): Unit = {
//0 添加执行环境
val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
//1 获得标签定义、标签任务的SQL、字标签的匹配规则,存储在MySQL中
val taskId: String = args(0)
val taskDate: String = args(1)
val tagInfo: TagInfo = TagInfoDAO.getTagInfoByTaskId(taskId)
val taskInfo: TaskInfo = TaskInfoDAO.getTaskInfo(taskId)
val taskTagRuleList: List[TaskTagRule] = TaskTagRuleDAO.getTaskTagRuleListByTaskId(taskId)
println(tagInfo)
println(taskInfo)
println(taskTagRuleList)
//2 如果没有表,需要根据标签定义规则建立标签表
val tableName = tagInfo.tagCode.toLowerCase()
val tagValueType = tagInfo.tagValueType match {
case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
case ConstCode.TAG_VALUE_TYPE_STRING => "string"
case ConstCode.TAG_VALUE_TYPE_DATE => "string"
}
val properties: Properties = MyPropertiesUtil.load("config.properties")
val hdfsPath: String = properties.getProperty("hdfs-store.path")
val dwDBName: String = properties.getProperty("data-warehouse.dbname")
val upDBName: String = properties.getProperty("user-profile.dbname")
// 创建sql
val createTableSQL =
s"""
| create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
| comment '${tagInfo.tagName}'
| partitioned by (dt string)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
| location '$hdfsPath/$upDBName/$tableName'
|""".stripMargin
println(createTableSQL)
sparkSession.sql(createTableSQL)
//3 根据标签定义和规则查询数据仓库,生成case when语句,将list中的数据转换成一个一个的when then
val whenThenList: List[String] = taskTagRuleList.map {
taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
}
val whenThenSQL: String = whenThenList.mkString(" ")
val selectSQL =
s"""
| select uid,
| case query_value
| $whenThenSQL end as tag_value
| from (${taskInfo.taskSql}) tv
|""".stripMargin
println(selectSQL)
//4 把数据写入到对应的标签表中
sparkSession.sql(s"use $dwDBName")
val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
println(insertSQL)
sparkSession.sql(insertSQL)
}
}

package com.hzy.userprofile.constants
object ConstCode {
val TAG_VALUE_TYPE_LONG="1"
val TAG_VALUE_TYPE_DECIMAL="2"
val TAG_VALUE_TYPE_STRING="3"
val TAG_VALUE_TYPE_DATE="4"
val TASK_PROCESS_SUCCESS="1"
val TASK_PROCESS_ERROR="2"
val TASK_STAGE_START="1"
val TASK_STAGE_RUNNING="2"
}
在task-sql的resources下添加以下几条配置信息
<configuration>
<property>
<name>javax.jdo.option.ConnectionURLname>
<value>jdbc:mysql://hadoop101:3306/metastore?createDatabaseIfNotExist=true&characterEncoding=utf-8&useSSL=falsevalue>
<description>JDBC connect string for a JDBC metastoredescription>
property>
<property>
<name>javax.jdo.option.ConnectionDriverNamename>
<value>com.mysql.jdbc.Drivervalue>
<description>Driver class name for a JDBC metastoredescription>
property>
填写mysql数据库账户密码<-->
<property>
<name>javax.jdo.option.ConnectionUserNamename>
<value>rootvalue>
<description>username to use against metastore databasedescription>
property>
<property>
<name>javax.jdo.option.ConnectionPasswordname>
<value>123456value>
<description>password to use against metastore databasedescription>
property>
<property>
<name>hive.cli.print.headername>
<value>truevalue>
property>
<property>
<name>hive.cli.print.current.dbname>
<value>truevalue>
property>
<property>
<name>hive.metastore.schema.verificationname>
<value>falsevalue>
property>
configuration>
<configuration>
<property>
<name>dfs.namenode.http-addressname>
<value>hadoop101:9870value>
property>
<property>
<name>dfs.namenode.secondary.http-addressname>
<value>hadoop101:9868value>
property>
<property>
<name>dfs.client.use.datanode.hostnamename>
<value>truevalue>
property>
configuration>
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
将task-common\src\main\resources\config.properties剪切到此目录下
原因:不能保证配置信息是公用的,所以在每个任务中单独存放配置信息
hdfs-store.path=hdfs://hadoop101:8020/user_profile
#数仓库
data-warehouse.dbname=gmall
#画像库
user-profile.dbname=user_profile1009
#mysql配置
mysql.url=jdbc:mysql://127.0.0.1:3306/user_profile_manager_1009?characterEncoding=utf-8&useSSL=false
mysql.username=root
mysql.password=root
启动hadoop,hive
# 进入hive创建库
create database user_profile1009;
use user_profile1009;
执行程序会抛出以下异常
Caused by: MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=ASUS, access=WRITE, inode=“/”:hzy:supergroup:drwxr-xr-x
原因:HDFS文件ower为‘hzy‘,而目前进行调试使用的是windows主机名
解决方案如下图:

其中控制台输出黑色的error可以忽略,运行成功后在hive中可以看到tg_base_persona_gender。
查询可看到统计数据。
以上为在本地运行,现需要将程序打包到服务器,自动运行。

将上述代码注释掉,使用yarn方式部署
//0 添加执行环境
val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app")//.setMaster("local[*]")
在task-sql中的pom文件中,添加一些配置,目的:将所有没有标记成provided的依赖打到jar包中
<build>
<plugins>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.4.6version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-assembly-pluginartifactId>
<version>3.0.0version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependenciesdescriptorRef>
descriptorRefs>
configuration>
<executions>
<execution>
<id>make-assemblyid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
将user-profile-task1009 工程 install
成功标志如下图:

完成后会出现tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar
打开画像管理平台,添加年龄段标签:
继续添加4级标签(60、70、80、90、00):
在3级标签年龄段上添加任务:
启用任务
执行方式:SQL
任务SQL:select id as uid,substr(birthday,3,1) as query_value from dim_user_info ui where dt='9999-99-99'
任务参数:
--driver-memory=1G
--num-executors=3
--executor-memory=2G
--executor-cores=2
--conf spark.default.parallelism=12
标签规则配置
进入流程任务管理 – 上传SQL通用任务jar包
点击上传,在数据库tag_common_task中可以查看到相应数据,如下图

其中id对应 file_info表中的id,其中重要的是file path,去HDFS上查看是否有这个文件
上传所需文件到服务器,修改配置文件
配置文件中callback.http.url 为回调地址,用于更新状态,可以将任务提交的状态实时抓取,并发送给画像管理平台,如果使用的是虚拟机,此处选项直接填windows的虚拟地址即可,一般为192.168.XX.101。
如果使用阿里云服务器,服务器需要能够访问windows的80端口,可以使用内网穿透,这里以花生壳为例。
具体配置如下:

结果:

说明:外网可以通过随机外网域名访问到内网主机,相当于外网域名和内网主机做了一个映射,没有映射,阿里云无法访问到内网。
callback.type=https
callback.http.url=http://m23o108551.zicp.fun/callback/task-status
启动脚本
#! /bin/bash
case $1 in
"start"){
echo " --------启动 $i 远程提交器-------"
nohup java -jar spark-rest-submitter-0.0.3-SNAPSHOT.jar -conf ./application.properties >submitter.log 2>&1 &
};;
"stop"){
echo " --------停止 $i 远程提交器-------"
ps -ef | grep rest-submitter| grep -v grep |awk '{print $2}' | xargs -n1 kill -9
};;
esac
赋予执行权限,执行
./rest-submit.sh start
启动起来之后会出现一个进程 7701 jar
修改E:\develop\MyWork\19用户画像\代码\1\user_profile_manager_0224\src\main\resources\application.properties中的信息,如下,用于画像平台去找spark提交器的位置
# 画像平台上传到hdfs的文件地址和路径
hdfs.url=hdfs://hadoop101:8020
hdfs.username=hzy
hdfs.filedir=/user_profile_manage
# 提交远程服务的配置,之后部署远程spark提交器的地址
spark.rest.submitter.url=http://hadoop101:8266/spark-submit
此时如果不手动调度,程序就会在夜里指定时间自动运行
点击画像平台【流程任务管理】的【手动调度任务】
业务日期,一般选择数仓最后一天的日期,也就是最新的时间,选择业务数据的时间:2020-06-15
在任务进程中可以查看到以下信息

执行完成,任务状态会变为FINISHED
查看yarn服务器结果

hive的user_profile1009库下会产生两张表

查看数据是否正常