Spark学习案例实操 尚硅谷 电商用户访问数据案例

文章目录

    • 概要
    • 整体案例需求
    • 1. 获取Top10的品类
    • 2.获取Top10品类后 找到每个品类里活跃度Top10的用户id
    • 3.计算单页跳转率

      概要

      Spark和Scala混合学习也有接近两周了,最近学完了RDD数据结构,以及一堆算子,跟着课程做第一个案例实操复习,现做一个学习笔记。

      整体案例需求

      这个案例需求一共分为三个:

      1.找到排名前十的品类,排名逻辑跟 点击量/下单量/支付量有关

      即先比点击量 如果一样再比下单量 如果还一样最后比支付量。

      2.找到排名前十品类里每个品类里活跃度排名前十的用户

      即根据用户名出现的次数排序得到排名前十的用户

      3.计算单页跳转率

      1. 获取Top10的品类

      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}
      object test_project2_pro2 { def main(args: Array[String]): Unit = { // 配置spark运行环境
          val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test_rdd")
          val sc = new SparkContext(sparkConf) 
          
        // 获取文件rdd 	
          val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)
          // 为了防止rdd被调用多次 生成多个数据源 造成数据重复调用 浪费内存资源
          // 可以在rdd创建后添加一个持久化
          val rdd_cache: rdd.type = rdd.cache()
          // 在点击/下单/支付三个数据源里 最终得到的rdd里将数据格式由(品类id,次数)修改为(品类id,(点击数量,下单数量,支付数量))
          // 在点击数据源里该数据为(品类id,(点击数量,0,0)) 其他两个类似只不过0变了位置
          // 过滤非品类数据行
          val rdd1: RDD[String] = rdd.filter(x => { var datas = x.split("_")(6)
            datas != "-1"
          })
          // 将品类数据行转化成 频率tuple (品类id,1)
          // 再根据品类id 做key 进行聚合操作得到 (品类id,sum)
          val rdd2 = rdd1.map(x => { val category: String = x.split("_")(6)
            (category, (1,0,0))
          })
          // 过滤非下单数据行
          val rdd3: RDD[String] = rdd_cache.filter(x => { var datas = x.split("_")(8)
            datas != "null"
          })
          // 下单的品类ID是连续的 因为存在一次下单多个品类的情况 即可能得到 1,2,3 这样的字段
          // 需要扁平化处理一下 将1,2,3 转化成 [1,2,3]的独立元素
          // 再通过map 添加频率计数 将[1,2,3]转化成[(1,1),(2,1),(3,1)]
          // 要注意这里必须要用flatmap 进行扁平化处理 ,否则直接使用map处理得到的是二维的数据 即每行数据转换成了数组
          // 使用flatmap则不会 得到的是list即把每行数据得到的数组进行拆分变成一个一个的元素
          // 得到下单数据单位元素 由于使用了flatmap 最终得到的rdd里元素格式就是(string,(int,int,int))
          val rdd5 = rdd3.flatMap(x => { val orders: String = x.split("_")(8)
            val order = orders.split(",")
            order.map(x => (x, (0,1,0)))
          })
          // 这里如果使用map得到的是数组 相当于每行的string变成了array 并没有降维
          //    val rdd4: RDD[Array[(String, Int)]] = rdd3.map(x => { //      val order: String = x.split("_")(8)
          //      val orders = order.split(",").map(x => (x, 1))
          //      orders
          //    })
          // 过滤非支付数据行
          val rdd6: RDD[String] = rdd_cache.filter(x => { var datas = x.split("_")(10)
            datas != "null"
          })
          // 得到支付数据单位元素 同下单一样
          val rdd7 = rdd6.flatMap(x => { val pays: String = x.split("_")(10)
            val pay = pays.split(",")
            pay.map(x => (x, (0,0,1)))
          })
          // 现在我们统计了各品类id 在点击/下单/支付里出现的次数
          // 我们现在想选出品类id 在三者综合数量排名前十的品类
          // 我们需要使用cogroup聚合三个rdd 得到(key:品类id,value:(点击数量 下单数量 支付数量))这样的数据类型
          // 要注意的是 cogroup得到的数据类型里 value里的元素是迭代器 但是实际上我们聚合的三组里以及提前统计了一遍聚合
          // 所以这里虽然是迭代器 但每个迭代器里只有一个元素 因为在聚合的三个rdd里每个品类id已经被聚合过了 只出现了一次
          // 使用mapvalues处理cogroup得到的rdd里的每个品类id key对应的value 转化为(点击,下单,支付)的格式
          /**
           * val rdd_final: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = rdd2.cogroup(rdd5, rdd7)
           * val rdd_final_ : RDD[(String, (Int, Int, Int))] = rdd_final.mapValues(v => { * var click = 0
           * val i1 = v._1.iterator
           * if (i1.hasNext) { * click = i1.next()
           * }
           * var order = 0
           * val i2 = v._2.iterator
           * if (i2.hasNext) { * order = i2.next()
           * }
           * var pay = 0
           * val i3 = v._3.iterator
           * if (i3.hasNext) { * pay = i3.next()
           * }
           * (click, order, pay)
           * })
           * */
          // 当我们把三个数据源的数据格式修改为(key,(click,order,pay))时,即可将三个数据源直接合并然后聚合
          val rdd_final: RDD[(String, (Int, Int, Int))] = rdd2.union(rdd5).union(rdd7).reduceByKey((v1, v2) => {
            (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3)
          })
          rdd_final.saveAsTextFile("output_project2")
          val rdd_top10: Array[(String, (Int, Int, Int))] = rdd_final.sortBy(x => x._2, false).take(10)
          rdd_top10.foreach(println)
        }
      }
      

      2.获取Top10品类后 找到每个品类里活跃度Top10的用户id

      那我们首先要获得Top10的品类 得到一个list,

      然后做数据筛选 把不是这10个品类的数据行去掉,最后在筛选后的数据里进行(品类,用户)双变量的分组,得到用户出现的次数,然后改变数据结构从((品类,用户),sum次数)->(品类,(用户,sum次数)),然后进行sortby排序(降序),得到最终结果

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.rdd.RDD
      object test_project2_pro3 { // 当我们统计出top10的品类id后,我们想找到top10品类里每个品类下top10的session以及他们的活跃次数
        // 首先将获取top10品类的rdd计算逻辑打包成方法
        def top10_category(rdd: RDD[String]): List[String] = { rdd.flatMap(x => { var datas = x.split("_")
            if (datas(6) != "-1") { List((datas(6), (1, 0, 0)))
            }
            else if (datas(8) != "null") { val orders: Array[String] = datas(8).split(",")
              orders.map(x => { (x, (0, 1, 0))
              })
            }
            else if (datas(10) != "null") { val pays: Array[String] = datas(10).split(",")
              pays.map(x => { (x, (0, 1, 0))
              })
            }
            else { Nil
            }
          }).reduceByKey((v1, v2) => {
            (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3)
          }).sortBy(x => x._2, false).map(x => x._1).take(10).toList
        }
        def main(args: Array[String]): Unit = {
          val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test_rdd")
          val sc = new SparkContext(sparkConf)
          val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)
          // 定义好获得top品类的方法后 我们接下来就可以根据top品类筛数据然后根据(品类,用户)多重排序得到用户数量
          val top10_c: List[String] = top10_category(rdd)
          val rdd2: RDD[(String, List[(String, Int)])] = rdd.filter(x => {
            var category = x.split("_")(6)
            if (category != "-1" & top10_c.contains(category)) { true
            }
            else { false
            }
          }).map(x => { var category = x.split("_")(6)
            var session = x.split("_")(2)
            ((category, session), 1)
          }).reduceByKey((a, b) => {
            a + b
          }).map(x => {
            (x._1._1, (x._1._2, x._2))
          }).groupByKey().mapValues(v => { v.toList.sortBy(x => x._2)(Ordering.Int.reverse).take(10)
          })
          rdd2.collect().foreach(println)
        }
      }
      

      3.计算单页跳转率

      这里要有个概念,首先session会话每个用户都会发生,而且计算跳转率的时候必须保证用户会话发生的记录是时间顺序的,这样的跳转才有意义,那么流程就很清晰了:

      首先指定出想计算的有意义的跳转率(例如首页->详情,详情->下单等),需要先列一个表,例如页面id

      1->2,2->3,3->4 在list里表示为((1,2),(2,3),(3,4)),如何实现这个链接表?需要用到zip算子,把相同位置的两个rdd连在一起,即rdd1=(1,2,3,4),rdd2=(2,3,4),那么rdd1.init.zip(rdd2)=((1,2),(2,3),(3,4)),这就是分子,那有了分子,我们需要分母,那分母是什么呢?想想看如果想计算1->2的跳转率=(1,2)出现的次数/1出现的次数,所以分母就是每个跳转tuple的第一个元素。

      那就很清晰了:

      1.总数据里首先统计每个分母出现的次数,根据分母分组 map转化结构为(分母,1)然后聚合reducebykey

      2.接下来根据用户id分组,那么得到的rdd里key是用户id values是这个用户发生的所有session会话,需要按时间排序

      3.接下来需要再使用zip将这个用户发生的会话进行一个跳表链接 例如一个用户发生的所有会话排序后为 1,3,5,8 那么使用zip后可以得到((1,3)(3,5)(5,8)),然后还需要进行一次数据结构转换方便统计跳表出现的次数 将(1,3)转化为((1,3),1)

      4. 实际上到这一步我们得到跳表频率后根本用不到其他数据字段了,那么再加一个map 直接把数据变成跳表频率的集合就可以啦。然后聚合一下reducebykey得到跳表频率。

      5. 最后需要针对每个跳表找到他们对应的分母然后计算跳转率即可!