• 大数据课程L9——网站流量项目的实时业务处理代码


    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

     ▲ 本章节目的

    ⚪ 掌握网站流量项目的SparkStreaming代码;

    ⚪ 掌握网站流量项目的HBaseUtil代码;

    ⚪ 掌握网站流量项目的MysqlUtil代码;

    ⚪ 掌握网站流量项目的LogBean代码;

    ⚪ 掌握网站流量项目的TongjiBean代码;

    一、SparkStreaming代码

    package cn.tedu.kafkasource

    import org.apache.kafka.clients.consumer.ConsumerRecord

    import org.apache.kafka.common.TopicPartition

    import org.apache.kafka.common.serialization.StringDeserializer

    import org.apache.spark.SparkConf

    import org.apache.spark.streaming.dstream.InputDStream

    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

    import org.apache.spark.streaming.kafka010._

    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import org.apache.spark.SparkContext

    import cn.tedu.pojo.LogBean

    import java.util.Calendar

    import cn.tedu.dao.HBaseUtil

    import cn.tedu.pojo.TongjiBean

    import cn.tedu.dao.MysqlUtil

    object SparkStreaming {

      def main(args: Array[String]): Unit = {

         val conf= new SparkConf().setMaster("local[3]").setAppName("test01")

                .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 

        val sc=new SparkContext(conf)   

        val ssc=new StreamingContext(sc, Seconds(5))   

        val kafkaParams: Map[String, Object] = Map[String, Object](

                "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",

                "key.deserializer" -> classOf[StringDeserializer],

                "value.deserializer" -> classOf[StringDeserializer],

                "group.id" -> "gp2"

            )

        val topics = Array("logdata")

        val kafkaSource=KafkaUtils.createDirectStream[String, String](

                ssc,

                PreferConsistent,

                Subscribe[String, String](topics, kafkaParams)

            ).map(x=>x.value())

        kafkaSource.foreachRDD{rdd=>

         //lines里存储了当前批次内的所有数据 

          val lines=rdd.toLocalIterator

          //遍历迭代器,对每条数据进行处理

          while(lines.hasNext){

            val line=lines.next()

            //第一步:清洗出所需要的业务字段。url,urlname,uvid,ssid,sscount,sstime,cip

            val info=line.split("\\|")

            val url=info(0)

            val urlname=info(1)

            val uvid=info(13)

            val ssid=info(14).split("_")(0)

            val sscount=info(14).split("_")(1)

            val sstime=info(14).split("_")(2)

  • 相关阅读:
    实现video视频缓存
    抖去推短视频seo矩阵系统源代码开发部署分享-开源SaaS
    【目标检测】图像裁剪/标签可视化/图像拼接处理脚本
    计算机网络各层协议总结
    springboot如何将http请求转换为https请求呢?
    软件设计师考试学习1
    区块链与跨链桥的本质理解
    A+B 输入输出练习I(c++基础)
    【JavaScript总结】双等与三等
    《算法导论》学习(十五)----二叉搜索树(C语言)
  • 原文地址:https://blog.csdn.net/u013955758/article/details/132818739