• Hudi Spark-Shell 实战


    图片

    1.1 启动

    spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是3.1.3,并且使用Hudi编译好的jar包。

    bin/spark-shell --jars ./hudi-spark3.1.2-bundle_2.12-0.10.1.jar \--packages org.apache.spark:spark-avro_2.12:3.1.2 \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
    
    
    • 1
    • 2

    duo@bigdata100:/bigdata/module/spark-3.1.3-bin-hadoop3.2$ bin/spark-shell --jars ./hudi-spark3.1.2-bundle_2.12-0.10.1.jar --packages org.apache.spark:spark-avro_2.12:3.1.2 --conf ‘spark.serializer=org.apache.spark.serializer.KryoSerializer’:: loading settings :: url = jar:file:/bigdata/module/spark-3.1.3-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlIvy Default Cache set to: /home/duo/.ivy2/cacheThe jars for the packages stored in: /home/duo/.ivy2/jarsorg.apache.spark#spark-avro_2.12 added as a dependency:: resolving dependencies :: org.apache.spark#spark-submit-parent-2d8c5a4f-fa4c-48b0-91da-718ded2a078d;1.0 confs: [default] found org.apache.spark#spark-avro_2.12;3.1.2 in central found org.spark-project.spark#unused;1.0.0 in central:: resolution report :: resolve 103ms :: artifacts dl 1ms :: modules in use: org.apache.spark#spark-avro_2.12;3.1.2 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | ---------------------------------------------------------------------:: retrieving :: org.apache.spark#spark-submit-parent-2d8c5a4f-fa4c-48b0-91da-718ded2a078d confs: [default] 0 artifacts copied, 2 already retrieved (0kB/3ms)22/07/03 06:18:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicableUsing Spark’s default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to “WARN”.To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark context Web UI available at http://bigdata100:4040Spark context available as ‘sc’ (master = local[*], app id = local-1656829137163).Spark session available as ‘spark’.Welcome to ____ __ / / ___ / / \ / _ / _ `/ __/ '/ // .__/_,// //_\ version 3.1.3 //Using Scala version 2.12.10 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_321)Type in expressions to have them evaluated.Type :help for more information.scala>

    
    ### 1.2 设置表名,基本路径和数据生成器
    
    The DataGenerator can generate sample inserts and updates based on the the sample trip schema here
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    // spark-shell
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._

    val tableName = “dataGen”
    val basePath = “file:///tmp/hudi/dataGen”
    val dataGen = new DataGenerator

    
    
    • 1

    scala> import org.apache.hudi.QuickstartUtils._import org.apache.hudi.QuickstartUtils._scala> import scala.collection.JavaConversions._import scala.collection.JavaConversions._scala> import org.apache.spark.sql.SaveMode._import org.apache.spark.sql.SaveMode._scala> import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceReadOptions._scala> import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.DataSourceWriteOptions._scala> import org.apache.hudi.config.HoodieWriteConfig._import org.apache.hudi.config.HoodieWriteConfig._scala>scala> val tableName = "dataGen"tableName: String = dataGenscala> val basePath = "file:///tmp/hudi/dataGen"basePath: String = file:///tmp/hudi/dataGenscala> val dataGen = new DataGeneratordataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@41e5664e

    
    ### 1.3 插入数据
    
    
    • 1
    • 2
    • 3

    val inserts = convertToStringList(dataGen.generateInserts(10))val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))df.write.format(“hudi”). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, “ts”). option(RECORDKEY_FIELD_OPT_KEY, “uuid”). option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)

    scala> val inserts = convertToStringList(dataGen.generateInserts(10))inserts: java.util.List[String] = [{"ts": 1656462490332, "uuid": "94db9618-303f-4919-b1e1-744a8fa3d27e", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1656562122357, "uuid": "06104cbd-c0e1-47ef-aadb-dcc374f7dc92", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1656535160259, "uuid": "44baaf8b-748e-407f-a044-29be6f5ea601", "rider": "rider-213", "driver"...scala>  val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))warning: there was one deprecation warning (since 2.12.0)warning: there was one deprecation warning (since 2.2.0)warning: there were two deprecation warnings in total; for details, enable `:setting -deprecation' or `:replay -deprecation'df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]scala>  df.write.format("hudi").     |         options(getQuickstartWriteConfigs).     |         option(PRECOMBINE_FIELD_OPT_KEY, "ts").     |         option(RECORDKEY_FIELD_OPT_KEY, "uuid").     |         option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").     |         option(TABLE_NAME, tableName).     |         mode(Overwrite).     |         save(basePath)warning: there was one deprecation warning; for details, enable `:setting -deprecation' or `:replay -deprecation'22/07/03 06:41:25 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf22/07/03 06:41:25 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
    mode(Overwrite) overwrites and recreates the table if it already exists
    ```
    
    检查/tmp/hudi/dataGen路径下是否有数据生成。
    
    ```
    duo@bigdata100:/tmp/hudi/dataGen$ lsamericas  asia
    
    ```
    
    ### 1.4 查询数据
    
    ```
    // spark-shell
    val tripsSnapshotDF = spark.
      read.
      format("hudi").
      load(basePath)
    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
    scala> val tripsSnapshotDF = spark.
         |   read.
         |   format("hudi").
         |   load(basePath)
    tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]
    ```
    
    ```
    scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    +------------------+-------------------+-------------------+-------------+
    |              fare|          begin_lon|          begin_lat|           ts|
    +------------------+-------------------+-------------------+-------------+
    | 27.79478688582596| 0.6273212202489661|0.11488393157088261|1656510375408|
    | 93.56018115236618|0.14285051259466197|0.21624150367601136|1656380101888|
    | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1656506788621|
    | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1656535160259|
    | 66.62084366450246|0.03844104444445928| 0.0750588760043035|1656284843592|
    |34.158284716382845|0.46157858450465483| 0.4726905879569653|1656462490332|
    |  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1656562122357|
    | 41.06290929046368| 0.8192868687714224|  0.651058505660742|1656746169980|
    +------------------+-------------------+-------------------+-------------+
    
    
    scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
    +-------------------+--------------------+----------------------+---------+----------+------------------+
    |_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
    +-------------------+--------------------+----------------------+---------+----------+------------------+
    |  20220703064125777|9c873a96-6a32-404...|  americas/united_s...|rider-213|driver-213| 27.79478688582596|
    |  20220703064125777|56e400c9-e420-4a7...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
    |  20220703064125777|767a5f82-a82c-4a9...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
    |  20220703064125777|68ebac6b-0d4e-4bc...|  americas/united_s...|rider-213|driver-213| 33.92216483948643|
    |  20220703064125777|44baaf8b-748e-407...|  americas/united_s...|rider-213|driver-213| 64.27696295884016|
    |  20220703064125777|415033a6-189f-401...|  americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
    |  20220703064125777|94db9618-303f-491...|  americas/brazil/s...|rider-213|driver-213|34.158284716382845|
    |  20220703064125777|06104cbd-c0e1-47e...|  americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
    |  20220703064125777|da3a7e1e-67e5-46c...|    asia/india/chennai|rider-213|driver-213|17.851135255091155|
    |  20220703064125777|39b26187-c626-45b...|    asia/india/chennai|rider-213|driver-213| 41.06290929046368|
    +-------------------+--------------------+----------------------+---------+----------+------------------+
    ```
    
    ```
    1.5 更新数据
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    // spark-shellval updates = convertToStringList(dataGen.generateUpdates(10))val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))df.write.format(“hudi”). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, “ts”). option(RECORDKEY_FIELD_OPT_KEY, “uuid”). option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”). option(TABLE_NAME, tableName). mode(Append). save(basePath)

    
    *   Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time. 
        
    
    *   Querying the data again will now show updated trips. 
        
    
    *   Each write operation generates a new commit denoted by the timestamp. Look for changes in **`_hoodie_commit_time`**, **`rider`**, **`driver`** fields for the **same `_hoodie_record_key`s** in previous commit.
        
    
    ### 1.6 增量查询
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    // spark-shell
    // reload data
    spark.
    read.
    format(“hudi”).
    load(basePath).
    createOrReplaceTempView(“hudi_trips_snapshot”)

    val commits = spark.sql(“select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime”).map(k => k.getString(0)).take(50)
    val beginTime = commits(commits.length - 2) // commit time we are interested in

    // incrementally query data
    val tripsIncrementalDF = spark.read.format(“hudi”).
    option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
    load(basePath)
    tripsIncrementalDF.createOrReplaceTempView(“hudi_trips_incremental”)

    spark.sql(“select _hoodie_commit_time, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0”).show()

    
    
    • 1

    scala> spark.sql(“select _hoodie_commit_time, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0”).show()
    ±------------------±-----------------±------------------±------------------±------------+
    |_hoodie_commit_time| fare| begin_lon| begin_lat| ts|
    ±------------------±-----------------±------------------±------------------±------------+
    | 20220703065600235| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|1656260221218|
    | 20220703065600235| 90.9053809533154|0.19949323322922063|0.18294079059016366|1656632325845|
    | 20220703065600235|49.527694252432056| 0.5142184937933181| 0.7340133901254792|1656763853161|
    | 20220703065600235| 90.25710109008239| 0.4006983139989222|0.08528650347654165|1656399583609|
    | 20220703065600235| 63.72504913279929| 0.888493603696927| 0.6570857443423376|1656568557419|
    | 20220703065600235| 86.75932789048282|0.13755354862499358| 0.7180196467760873|1656709831995|
    ±------------------±-----------------±------------------±------------------±------------+

    
    
    • 1

    1.7 时间点查询

    
    
    • 1

    // spark-shell
    val beginTime = “000” // Represents all commits > this time.
    val endTime = commits(commits.length - 2) // commit time we are interested in

    //incrementally query data
    val tripsPointInTimeDF = spark.read.format(“hudi”).
    option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
    option(END_INSTANTTIME_OPT_KEY, endTime).
    load(basePath)
    tripsPointInTimeDF.createOrReplaceTempView(“hudi_trips_point_in_time”)
    spark.sql(“select _hoodie_commit_time, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0”).show()

    
    
    • 1

    1.8 删除数据

    
    Only `Append` mode is supported for delete operation.
    
    
    • 1
    • 2
    • 3

    // spark-shell// fetch total records countspark.sql(“select uuid, partitionpath from hudi_trips_snapshot”).count()// fetch two records to be deletedval ds = spark.sql(“select uuid, partitionpath from hudi_trips_snapshot”).limit(2)// issue deletesval deletes = dataGen.generateDeletes(ds.collectAsList())val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))df.write.format(“hudi”). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,“delete”). option(PRECOMBINE_FIELD_OPT_KEY, “ts”). option(RECORDKEY_FIELD_OPT_KEY, “uuid”). option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”). option(TABLE_NAME, tableName). mode(Append). save(basePath)// run the same read query as above.val roAfterDeleteViewDF = spark. read. format(“hudi”). load(basePath)roAfterDeleteViewDF.registerTempTable(“hudi_trips_snapshot”)// fetch should return (total - 2) recordsspark.sql(“select uuid, partitionpath from hudi_trips_snapshot”).count()

    scala> spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()res17: Long = 10scala> val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)ds: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [uuid: string, partitionpath: string]scala> val deletes = dataGen.generateDeletes(ds.collectAsList())deletes: java.util.List[String] = [{"ts": "0.0","uuid": "9c873a96-6a32-4046-acc1-1abfeeb4c9b1","partitionpath": "americas/united_states/san_francisco"}, {"ts": "0.0","uuid": "56e400c9-e420-4a7c-b553-8730f4d5dbd3","partitionpath": "americas/united_states/san_francisco"}]scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))warning: there was one deprecation warning (since 2.12.0)warning: there was one deprecation warning (since 2.2.0)warning: there were two deprecation warnings in total; for details, enable `:setting -deprecation' or `:replay -deprecation'df: org.apache.spark.sql.DataFrame = [partitionpath: string, ts: string ... 1 more field]scala> df.write.format("hudi").     |   options(getQuickstartWriteConfigs).     |   option(OPERATION_OPT_KEY,"delete").     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").     |   option(TABLE_NAME, tableName).     |   mode(Append).     |   save(basePath)warning: there was one deprecation warning; for details, enable `:setting -deprecation' or `:replay -deprecation'scala> val roAfterDeleteViewDF = spark.     |   read.     |   format("hudi").     |   load(basePath)roAfterDeleteViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")warning: there was one deprecation warning (since 2.0.0); for details, enable `:setting -deprecation' or `:replay -deprecation'scala> spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()res20: Long = 8
    
    ```
    
    ![图片](https://img-blog.csdnimg.cn/img_convert/e58bcd06408b037080301389cbc101b9.gif)
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    C++线程池
    WPF/C#:如何实现拖拉元素
    Jmeter系列进阶-获取图片验证码(4)
    XCon2023 | 聚铭网络受邀出席并发表“安全运营中心的应用及发展”主题演讲
    【JavaSE】数据类型和运算符
    GIS工具maptalks开发手册(二)01-11——渲染文字及参数注释
    《深度学习进阶 自然语言处理》第三章:word2vec
    淘宝产品ID在哪儿查询?
    Docker部署go项目
    Stearic acid-PEG-FITC 硬脂酸-聚乙二醇-荧光素
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126234777