文章目录
- 概要
- 整体案例需求
- 1. 获取Top10的品类
- 2.获取Top10品类后 找到每个品类里活跃度Top10的用户id
- 3.计算单页跳转率
概要
Spark和Scala混合学习也有接近两周了,最近学完了RDD数据结构,以及一堆算子,跟着课程做第一个案例实操复习,现做一个学习笔记。
整体案例需求
这个案例需求一共分为三个:
1.找到排名前十的品类,排名逻辑跟 点击量/下单量/支付量有关
即先比点击量 如果一样再比下单量 如果还一样最后比支付量。
2.找到排名前十品类里每个品类里活跃度排名前十的用户
即根据用户名出现的次数排序得到排名前十的用户
3.计算单页跳转率
1. 获取Top10的品类
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object test_project2_pro2 { def main(args: Array[String]): Unit = { // 配置spark运行环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test_rdd") val sc = new SparkContext(sparkConf) // 获取文件rdd val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6) // 为了防止rdd被调用多次 生成多个数据源 造成数据重复调用 浪费内存资源 // 可以在rdd创建后添加一个持久化 val rdd_cache: rdd.type = rdd.cache() // 在点击/下单/支付三个数据源里 最终得到的rdd里将数据格式由(品类id,次数)修改为(品类id,(点击数量,下单数量,支付数量)) // 在点击数据源里该数据为(品类id,(点击数量,0,0)) 其他两个类似只不过0变了位置 // 过滤非品类数据行 val rdd1: RDD[String] = rdd.filter(x => { var datas = x.split("_")(6) datas != "-1" }) // 将品类数据行转化成 频率tuple (品类id,1) // 再根据品类id 做key 进行聚合操作得到 (品类id,sum) val rdd2 = rdd1.map(x => { val category: String = x.split("_")(6) (category, (1,0,0)) }) // 过滤非下单数据行 val rdd3: RDD[String] = rdd_cache.filter(x => { var datas = x.split("_")(8) datas != "null" }) // 下单的品类ID是连续的 因为存在一次下单多个品类的情况 即可能得到 1,2,3 这样的字段 // 需要扁平化处理一下 将1,2,3 转化成 [1,2,3]的独立元素 // 再通过map 添加频率计数 将[1,2,3]转化成[(1,1),(2,1),(3,1)] // 要注意这里必须要用flatmap 进行扁平化处理 ,否则直接使用map处理得到的是二维的数据 即每行数据转换成了数组 // 使用flatmap则不会 得到的是list即把每行数据得到的数组进行拆分变成一个一个的元素 // 得到下单数据单位元素 由于使用了flatmap 最终得到的rdd里元素格式就是(string,(int,int,int)) val rdd5 = rdd3.flatMap(x => { val orders: String = x.split("_")(8) val order = orders.split(",") order.map(x => (x, (0,1,0))) }) // 这里如果使用map得到的是数组 相当于每行的string变成了array 并没有降维 // val rdd4: RDD[Array[(String, Int)]] = rdd3.map(x => { // val order: String = x.split("_")(8) // val orders = order.split(",").map(x => (x, 1)) // orders // }) // 过滤非支付数据行 val rdd6: RDD[String] = rdd_cache.filter(x => { var datas = x.split("_")(10) datas != "null" }) // 得到支付数据单位元素 同下单一样 val rdd7 = rdd6.flatMap(x => { val pays: String = x.split("_")(10) val pay = pays.split(",") pay.map(x => (x, (0,0,1))) }) // 现在我们统计了各品类id 在点击/下单/支付里出现的次数 // 我们现在想选出品类id 在三者综合数量排名前十的品类 // 我们需要使用cogroup聚合三个rdd 得到(key:品类id,value:(点击数量 下单数量 支付数量))这样的数据类型 // 要注意的是 cogroup得到的数据类型里 value里的元素是迭代器 但是实际上我们聚合的三组里以及提前统计了一遍聚合 // 所以这里虽然是迭代器 但每个迭代器里只有一个元素 因为在聚合的三个rdd里每个品类id已经被聚合过了 只出现了一次 // 使用mapvalues处理cogroup得到的rdd里的每个品类id key对应的value 转化为(点击,下单,支付)的格式 /** * val rdd_final: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = rdd2.cogroup(rdd5, rdd7) * val rdd_final_ : RDD[(String, (Int, Int, Int))] = rdd_final.mapValues(v => { * var click = 0 * val i1 = v._1.iterator * if (i1.hasNext) { * click = i1.next() * } * var order = 0 * val i2 = v._2.iterator * if (i2.hasNext) { * order = i2.next() * } * var pay = 0 * val i3 = v._3.iterator * if (i3.hasNext) { * pay = i3.next() * } * (click, order, pay) * }) * */ // 当我们把三个数据源的数据格式修改为(key,(click,order,pay))时,即可将三个数据源直接合并然后聚合 val rdd_final: RDD[(String, (Int, Int, Int))] = rdd2.union(rdd5).union(rdd7).reduceByKey((v1, v2) => { (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3) }) rdd_final.saveAsTextFile("output_project2") val rdd_top10: Array[(String, (Int, Int, Int))] = rdd_final.sortBy(x => x._2, false).take(10) rdd_top10.foreach(println) } }
2.获取Top10品类后 找到每个品类里活跃度Top10的用户id
那我们首先要获得Top10的品类 得到一个list,
然后做数据筛选 把不是这10个品类的数据行去掉,最后在筛选后的数据里进行(品类,用户)双变量的分组,得到用户出现的次数,然后改变数据结构从((品类,用户),sum次数)->(品类,(用户,sum次数)),然后进行sortby排序(降序),得到最终结果
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object test_project2_pro3 { // 当我们统计出top10的品类id后,我们想找到top10品类里每个品类下top10的session以及他们的活跃次数 // 首先将获取top10品类的rdd计算逻辑打包成方法 def top10_category(rdd: RDD[String]): List[String] = { rdd.flatMap(x => { var datas = x.split("_") if (datas(6) != "-1") { List((datas(6), (1, 0, 0))) } else if (datas(8) != "null") { val orders: Array[String] = datas(8).split(",") orders.map(x => { (x, (0, 1, 0)) }) } else if (datas(10) != "null") { val pays: Array[String] = datas(10).split(",") pays.map(x => { (x, (0, 1, 0)) }) } else { Nil } }).reduceByKey((v1, v2) => { (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3) }).sortBy(x => x._2, false).map(x => x._1).take(10).toList } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test_rdd") val sc = new SparkContext(sparkConf) val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6) // 定义好获得top品类的方法后 我们接下来就可以根据top品类筛数据然后根据(品类,用户)多重排序得到用户数量 val top10_c: List[String] = top10_category(rdd) val rdd2: RDD[(String, List[(String, Int)])] = rdd.filter(x => { var category = x.split("_")(6) if (category != "-1" & top10_c.contains(category)) { true } else { false } }).map(x => { var category = x.split("_")(6) var session = x.split("_")(2) ((category, session), 1) }).reduceByKey((a, b) => { a + b }).map(x => { (x._1._1, (x._1._2, x._2)) }).groupByKey().mapValues(v => { v.toList.sortBy(x => x._2)(Ordering.Int.reverse).take(10) }) rdd2.collect().foreach(println) } }
3.计算单页跳转率
这里要有个概念,首先session会话每个用户都会发生,而且计算跳转率的时候必须保证用户会话发生的记录是时间顺序的,这样的跳转才有意义,那么流程就很清晰了:
首先指定出想计算的有意义的跳转率(例如首页->详情,详情->下单等),需要先列一个表,例如页面id
1->2,2->3,3->4 在list里表示为((1,2),(2,3),(3,4)),如何实现这个链接表?需要用到zip算子,把相同位置的两个rdd连在一起,即rdd1=(1,2,3,4),rdd2=(2,3,4),那么rdd1.init.zip(rdd2)=((1,2),(2,3),(3,4)),这就是分子,那有了分子,我们需要分母,那分母是什么呢?想想看如果想计算1->2的跳转率=(1,2)出现的次数/1出现的次数,所以分母就是每个跳转tuple的第一个元素。
那就很清晰了:
1.总数据里首先统计每个分母出现的次数,根据分母分组 map转化结构为(分母,1)然后聚合reducebykey
2.接下来根据用户id分组,那么得到的rdd里key是用户id values是这个用户发生的所有session会话,需要按时间排序
3.接下来需要再使用zip将这个用户发生的会话进行一个跳表链接 例如一个用户发生的所有会话排序后为 1,3,5,8 那么使用zip后可以得到((1,3)(3,5)(5,8)),然后还需要进行一次数据结构转换方便统计跳表出现的次数 将(1,3)转化为((1,3),1)
4. 实际上到这一步我们得到跳表频率后根本用不到其他数据字段了,那么再加一个map 直接把数据变成跳表频率的集合就可以啦。然后聚合一下reducebykey得到跳表频率。
5. 最后需要针对每个跳表找到他们对应的分母然后计算跳转率即可!