//1.创建StreamingContext
val conf: SparkConf = new SparkConf().setAppName(“kafkaWordCount”).setMaster(“local[2]”)
val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000))
//2.接入kafka数据源(如何访问kafka集群?zookeeper)
val zkQuorm: String = “192.168.64.111,192.168.64.112,192.168.64.113”
//访问组
val groupID = “g1”
//访问主题
val topic: Map[String, Int] = MapString,Int
//创建Dstream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils
.createStream(ssc,zkQuorm,groupID,topic)
//3.处理数据
val data: DStream[String] = kafkaStream.map(_._2)
//4.启动streaming程序
val r: DStream[(String, Int)] = data.flatMap(.split(" ")).map((,1)).reduceByKey(+)
r.print()
ssc.start()
//5.关闭资源
ssc.awaitTermination()
}
}
结果
2.0版本单词计数
将历史记录保存下来,显示出来,主要使用dataFunc
package day08
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object StatusKafkaWordCount {
//保持历史状态 wc 单词,次数 聚合的key
//第一个类型:单词,第二个类型:在每一个分区中出现的次数累加的结果
//第三个类型:是以前的结果
val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])]) => {
//总的次数= 当前出现的次数 + 以前返回的结果
iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
}
def main(args: Array[String]): Unit = {
//1.创建程序入口
val conf: SparkConf = new SparkConf().setAppName(“StateKafkaWC”).setMaster(“local[2]”)
val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000))
//2.需要累加历史数据 checkpoints
ssc.checkpoint(“hdfs://192.168.64.111:9000/ck”)
//3.接入kafka数据源
val zkQuorm: String = “192.168.64.111,192.168.64.112,192.168.64.113”
//访问组
val groupID = “g1”
//访问主题
val topic: Map[String, Int] = MapString,Int
//创建Dstream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils
.createStream(ssc,zkQuorm,groupID,topic)
//4.处理数据
val data: DStream[String] = kafkaStream.map(_._2)
//5.加入历史数据计算
val r: DStream[(String, Int)] = data.flatMap(.split(" ")).map((, 1))
//参数1:自定义业务函数 参数2:分区器设置 参数3:是否使用
.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
//6.打印
r.print()
//7.启动程序
ssc.start()
//8.关闭资源
ssc.awaitTermination()
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Python工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Python开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上前端开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以扫码获取!!!(备注Python)
008edf79.png)
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上前端开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以扫码获取!!!(备注Python)