• 【用户画像】标签任务开发流程(源码之动态建表、常量类、配置信息、本地调试、发布运行、任务调度)


    一 代码实现

    0 开发主线

    (1)动态建表

    2 如果没有表,需要根据标签定义规则建立标签表。

    每一个标签一个表

    • 自动生成建表语句(表名、字段名、分区、格式、存储位置)
      使用tag_info中的tagcode作为表名

    • 字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】

    • 分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量

    • 压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算

      create  table tableName (uid string,tag_value tag_valueType)
      comment tagName
      partition by (dt string)
      ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
      location '/hdfs_path/dbName/tableName'
      
      • 1
      • 2
      • 3
      • 4
      • 5
    //2 如果没有表,需要根据标签定义规则建立标签表
        //    每一个标签一个表
        //    自动生成建表语句(表名、字段名、分区、格式、存储位置)
        //                   使用tag_info中的tagcode作为表名
        //    字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】
        //    分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量
        //    压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算
        //    create table tableName (uid string,tag_value tag_valueType)
        //    comment tagName
        //    partition by (dt string)
        //    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
        //    location '/hdfs_path/dbName/tableName'
    
        // 获取建表语句中的元素
        // 获取表名
        val tableName = tagInfo.tagCode.toLowerCase()
    
        // 获取tagValueType字段类型(1整数 2浮点 3文本 4日期)
        // 常量值一般不写在代码中,所以创建一个常量类,对应标题1
        val tagValueType = tagInfo.tagValueType match {
          case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
          case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
          case ConstCode.TAG_VALUE_TYPE_STRING => "string"
          case ConstCode.TAG_VALUE_TYPE_DATE => "string"
        }
    
        //config.properties文件中
        //hdfs-store.path=hdfs://hadoop101:8020/user_profile:存储位置
        //data-warehouse.dbname=gmall:数仓库
        //user-profile.dbname=user_profile2022:画像库
        val properties: Properties = MyPropertiesUtil.load("config.properties")
        val hdfsPath: String = properties.getProperty("hdfs-store.path")
        val dwDBName: String = properties.getProperty("data-warehouse.dbname")
        val upDBName: String = properties.getProperty("user-profile.dbname")
    
        // 创建sql
        val createTableSQL =
          s"""
             | create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
             | comment '${tagInfo.tagName}'
             | partitioned by (dt string)
             | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
             | location '$hdfsPath/$upDBName/$tableName'
             |""".stripMargin
    
        println(createTableSQL)
        sparkSession.sql(createTableSQL)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    (2)插入数据

    3 根据标签定义和规则查询数据仓库

    4 把数据写入到对应的标签表中

    • 核心:围绕task_info中的task_sql语句进行处理,查出来的数据为UFM,想插入的数据为男女未知,中间需要进行转换,使用case when

      select uid, case query_value
          when 'F' then '女'
          when 'xx' then ''
          when 'xx' then '' end as tag_value
      from ($sql)
      
      • 1
      • 2
      • 3
      • 4
      • 5
    //3 根据标签定义和规则查询数据仓库
        //
        // select uid, case query_value
        //      when 'F' then '女'
        //      when 'xx' then ''
        //      when 'xx' then '' end as tag_value
        //  from ($sql)
        // 动态生成case when语句
        // 3.1 先生成case when语句
    
        //将list中的数据转换成一个一个的when then
        //将TaskTagRule(1,7,1,F,8,男), TaskTagRule(2,7,1,M,9,女), TaskTagRule(3,7,1,U,10,未知)
        //转换为List( when F then 男,  when M then 女,  when U then 未知)
        val whenThenList: List[String] = taskTagRuleList.map {
          taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
        }
        //将List转换为字符串,按照空格进行分割
        val whenThenSQL: String = whenThenList.mkString(" ")
        val selectSQL =
          s"""
             | select uid,
             |   case query_value
             |  $whenThenSQL end as tag_value
             | from (${taskInfo.taskSql}) tv
             |""".stripMargin
        println(selectSQL)
    
        //4 把数据写入到对应的标签表中
        //insert执行时,需要跨库操作,从数仓库写入到数仓库,需要添加库名
        //画像库的库名容易添加,但数仓的sql是直接传进来的,怎么解决?
        //可以定义sql在哪里执行
        sparkSession.sql(s"use $dwDBName")
        val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
        println(insertSQL)
        sparkSession.sql(insertSQL)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    (3)完整语句

    package com.hzy.userprofile.app
    
    import java.util.Properties
    
    import com.hzy.userprofile.bean.{TagInfo, TaskInfo, TaskTagRule}
    import com.hzy.userprofile.constants.ConstCode
    import com.hzy.userprofile.dao.{TagInfoDAO, TaskInfoDAO, TaskTagRuleDAO}
    import com.hzy.userprofile.util.MyPropertiesUtil
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object TaskSQLApp {
      def main(args: Array[String]): Unit = {
    
        //0 添加执行环境
        val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app").setMaster("local[*]")
        val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    
    
        //1 获得标签定义、标签任务的SQL、字标签的匹配规则,存储在MySQL中
        val taskId: String = args(0)
        val taskDate: String = args(1)
        
        val tagInfo: TagInfo = TagInfoDAO.getTagInfoByTaskId(taskId)
        val taskInfo: TaskInfo = TaskInfoDAO.getTaskInfo(taskId)
        val taskTagRuleList: List[TaskTagRule] = TaskTagRuleDAO.getTaskTagRuleListByTaskId(taskId)
    
        println(tagInfo)
        println(taskInfo)
        println(taskTagRuleList)
    
        //2 如果没有表,需要根据标签定义规则建立标签表
        val tableName = tagInfo.tagCode.toLowerCase()
        
        val tagValueType = tagInfo.tagValueType match {
          case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
          case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
          case ConstCode.TAG_VALUE_TYPE_STRING => "string"
          case ConstCode.TAG_VALUE_TYPE_DATE => "string"
        }
    
        val properties: Properties = MyPropertiesUtil.load("config.properties")
        val hdfsPath: String = properties.getProperty("hdfs-store.path")
        val dwDBName: String = properties.getProperty("data-warehouse.dbname")
        val upDBName: String = properties.getProperty("user-profile.dbname")
    
        // 创建sql
        val createTableSQL =
          s"""
             | create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
             | comment '${tagInfo.tagName}'
             | partitioned by (dt string)
             | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
             | location '$hdfsPath/$upDBName/$tableName'
             |""".stripMargin
    
        println(createTableSQL)
        sparkSession.sql(createTableSQL)
    
    
        //3 根据标签定义和规则查询数据仓库,生成case when语句,将list中的数据转换成一个一个的when then
        val whenThenList: List[String] = taskTagRuleList.map {
          taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
        }
        val whenThenSQL: String = whenThenList.mkString(" ")
        val selectSQL =
          s"""
             | select uid,
             |   case query_value
             |  $whenThenSQL end as tag_value
             | from (${taskInfo.taskSql}) tv
             |""".stripMargin
        println(selectSQL)
    
        //4 把数据写入到对应的标签表中
        sparkSession.sql(s"use $dwDBName")
        val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
        println(insertSQL)
        sparkSession.sql(insertSQL)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    1 常量类

    在这里插入图片描述

    package com.hzy.userprofile.constants
    
    object ConstCode {
      val TAG_VALUE_TYPE_LONG="1"
      val TAG_VALUE_TYPE_DECIMAL="2"
      val TAG_VALUE_TYPE_STRING="3"
      val TAG_VALUE_TYPE_DATE="4"
    
      val TASK_PROCESS_SUCCESS="1"
      val TASK_PROCESS_ERROR="2"
    
      val TASK_STAGE_START="1"
      val TASK_STAGE_RUNNING="2"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2 配置信息

    在task-sql的resources下添加以下几条配置信息

    (1)hive-site.xml

    
    
    <configuration>
        <property>
            <name>javax.jdo.option.ConnectionURLname>
            <value>jdbc:mysql://hadoop101:3306/metastore?createDatabaseIfNotExist=true&characterEncoding=utf-8&useSSL=falsevalue>
            <description>JDBC connect string for a JDBC metastoredescription>
        property>
    
        <property>
            <name>javax.jdo.option.ConnectionDriverNamename>
            <value>com.mysql.jdbc.Drivervalue>
            <description>Driver class name for a JDBC metastoredescription>
        property>
    
         填写mysql数据库账户密码<-->
        <property>
            <name>javax.jdo.option.ConnectionUserNamename>
            <value>rootvalue>
            <description>username to use against metastore databasedescription>
        property>
    
        <property>
            <name>javax.jdo.option.ConnectionPasswordname>
            <value>123456value>
            <description>password to use against metastore databasedescription>
        property>
        <property>
            <name>hive.cli.print.headername>
            <value>truevalue>
        property>
    
        <property>
            <name>hive.cli.print.current.dbname>
            <value>truevalue>
        property>
        <property>
            <name>hive.metastore.schema.verificationname>
            <value>falsevalue>
        property>
    
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    (2)hdfs-site.xml

    <configuration>
    
        
        <property>
            <name>dfs.namenode.http-addressname>
            <value>hadoop101:9870value>
        property>
        
        <property>
            <name>dfs.namenode.secondary.http-addressname>
            <value>hadoop101:9868value>
        property>
        <property>
            <name>dfs.client.use.datanode.hostnamename>
            <value>truevalue>
    
        property>
    
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    (3)log4j.properties

    log4j.rootLogger=error, stdout,R
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n
    
    log4j.appender.R=org.apache.log4j.RollingFileAppender
    log4j.appender.R.File=../log/agent.log
    log4j.appender.R.MaxFileSize=1024KB
    log4j.appender.R.MaxBackupIndex=1
    
    log4j.appender.R.layout=org.apache.log4j.PatternLayout
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    将task-common\src\main\resources\config.properties剪切到此目录下

    原因:不能保证配置信息是公用的,所以在每个任务中单独存放配置信息

    (4)config.properties

    hdfs-store.path=hdfs://hadoop101:8020/user_profile
    #数仓库
    data-warehouse.dbname=gmall
    #画像库
    user-profile.dbname=user_profile1009
    
    #mysql配置
    mysql.url=jdbc:mysql://127.0.0.1:3306/user_profile_manager_1009?characterEncoding=utf-8&useSSL=false
    mysql.username=root
    mysql.password=root
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3 本地调试

    启动hadoop,hive

    # 进入hive创建库
    create database user_profile1009;
    use user_profile1009;
    
    • 1
    • 2
    • 3

    执行程序会抛出以下异常

    Caused by: MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=ASUS, access=WRITE, inode=“/”:hzy:supergroup:drwxr-xr-x

    原因:HDFS文件ower为‘hzy‘,而目前进行调试使用的是windows主机名

    解决方案如下图:

    在这里插入图片描述

    其中控制台输出黑色的error可以忽略,运行成功后在hive中可以看到tg_base_persona_gender。

    查询可看到统计数据。

    4 发布运行

    (1)环境配置

    以上为在本地运行,现需要将程序打包到服务器,自动运行。

    在这里插入图片描述

    将上述代码注释掉,使用yarn方式部署

    //0 添加执行环境
    val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app")//.setMaster("local[*]")
    
    • 1
    • 2

    在task-sql中的pom文件中,添加一些配置,目的:将所有没有标记成provided的依赖打到jar包中

    <build>
        <plugins>
            
            <plugin>
                <groupId>net.alchim31.mavengroupId>
                <artifactId>scala-maven-pluginartifactId>
                <version>3.4.6version>
                <executions>
                    <execution>
                        
                        <goals>
                            <goal>compilegoal>
                            <goal>testCompilegoal>
                        goals>
                    execution>
                executions>
            plugin>
    
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-assembly-pluginartifactId>
                <version>3.0.0version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependenciesdescriptorRef>
                    descriptorRefs>
                configuration>
                <executions>
                    <execution>
                        <id>make-assemblyid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
        plugins>
    build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    将user-profile-task1009 工程 install

    成功标志如下图:

    在这里插入图片描述

    完成后会出现tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar

    (2)标签定义

    打开画像管理平台,添加年龄段标签:

    • 上级标签:自然属性
    • 上级标签编码:TG_BASE_PERSONA
    • 标签编码:TG_BASE_PERSONA_AGEGROUP
    • 标签名称:年龄段
    • 标签类型:统计
    • 标签值类型:文本

    继续添加4级标签(60、70、80、90、00):

    • 上级标签:年龄段
    • 上级标签编码:TG_BASE_PERSONA_AGEGROUP
    • 标签编码:TG_BASE_PERSONA_AGEGROUP_60
    • 标签名称:60后
    • 标签类型:统计
    • 标签值类型:文本

    在3级标签年龄段上添加任务:

    • 启用任务

    • 执行方式:SQL

    • 任务SQL:select id as uid,substr(birthday,3,1) as query_value from dim_user_info ui where dt='9999-99-99'

    • 任务参数:

      --driver-memory=1G 
      --num-executors=3 
      --executor-memory=2G 
      --executor-cores=2
      --conf spark.default.parallelism=12
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 标签规则配置

      • 6 60后
      • 7 70后
      • 8 80后
      • 9 90后
      • 0 00后

    (3)上传jar包

    进入流程任务管理 – 上传SQL通用任务jar包

    • 任务主类:com.hzy.userprofile.app.TaskSQLApp
    • jar包上传:上传tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar

    点击上传,在数据库tag_common_task中可以查看到相应数据,如下图

    在这里插入图片描述

    其中id对应 file_info表中的id,其中重要的是file path,去HDFS上查看是否有这个文件

    5 启动调度

    (1)远程提交器部署

    上传所需文件到服务器,修改配置文件

    配置文件中callback.http.url 为回调地址,用于更新状态,可以将任务提交的状态实时抓取,并发送给画像管理平台,如果使用的是虚拟机,此处选项直接填windows的虚拟地址即可,一般为192.168.XX.101。

    如果使用阿里云服务器,服务器需要能够访问windows的80端口,可以使用内网穿透,这里以花生壳为例。

    具体配置如下:

    在这里插入图片描述

    结果:

    在这里插入图片描述

    说明:外网可以通过随机外网域名访问到内网主机,相当于外网域名和内网主机做了一个映射,没有映射,阿里云无法访问到内网。

    callback.type=https
    callback.http.url=http://m23o108551.zicp.fun/callback/task-status
    
    • 1
    • 2

    启动脚本

    #! /bin/bash
    
    case $1 in
    "start"){
                    echo " --------启动 $i 远程提交器-------"
                    nohup java -jar spark-rest-submitter-0.0.3-SNAPSHOT.jar -conf ./application.properties >submitter.log 2>&1 &
    };;
    "stop"){
                    echo " --------停止 $i 远程提交器-------"
                    ps -ef | grep rest-submitter| grep -v grep |awk  '{print $2}' | xargs -n1 kill -9
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    赋予执行权限,执行

    ./rest-submit.sh start
    
    • 1

    启动起来之后会出现一个进程 7701 jar

    (2)任务调度

    修改E:\develop\MyWork\19用户画像\代码\1\user_profile_manager_0224\src\main\resources\application.properties中的信息,如下,用于画像平台去找spark提交器的位置

    # 画像平台上传到hdfs的文件地址和路径
    hdfs.url=hdfs://hadoop101:8020
    hdfs.username=hzy
    hdfs.filedir=/user_profile_manage
    
    # 提交远程服务的配置,之后部署远程spark提交器的地址
    spark.rest.submitter.url=http://hadoop101:8266/spark-submit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    此时如果不手动调度,程序就会在夜里指定时间自动运行

    点击画像平台【流程任务管理】的【手动调度任务】

    业务日期,一般选择数仓最后一天的日期,也就是最新的时间,选择业务数据的时间:2020-06-15

    在任务进程中可以查看到以下信息

    在这里插入图片描述

    执行完成,任务状态会变为FINISHED

    查看yarn服务器结果

    在这里插入图片描述

    hive的user_profile1009库下会产生两张表

    在这里插入图片描述

    查看数据是否正常

  • 相关阅读:
    趣学算法 —— 兔子数列
    排课系统.admin.coursetask
    Matlab:设置日期和时间显示格式
    Java项目如何导出数据为 PDF 文件?
    mindspore.dataset.NumpySlicesDataset使用报错
    使用Windows系统自带的安全加密解密文件操作步骤详解
    python网页爬虫xpath应用
    以 ZGC 为例,谈一谈 JVM 是如何实现 Reference 语义的
    MySQL向自增列插入0失败问题
    装了我这 10 个 IDEA 神级插件后,同事也开始情不自禁的嘚瑟了
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/127471380