Spark-stream基础---sparkStreaming和Kafka整合wordCount单词计数(1)

//1.创建StreamingContext

val conf: SparkConf = new SparkConf().setAppName(“kafkaWordCount”).setMaster(“local[2]”)

val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000))

//2.接入kafka数据源(如何访问kafka集群?zookeeper)

val zkQuorm: String = “192.168.64.111,192.168.64.112,192.168.64.113”

//访问组

val groupID = “g1”

//访问主题

val topic: Map[String, Int] = MapString,Int

//创建Dstream

val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils

.createStream(ssc,zkQuorm,groupID,topic)

//3.处理数据

val data: DStream[String] = kafkaStream.map(_._2)

//4.启动streaming程序

val r: DStream[(String, Int)] = data.flatMap(.split(" ")).map((,1)).reduceByKey(+)

r.print()

ssc.start()

//5.关闭资源

ssc.awaitTermination()

}

}

结果

2.0版本单词计数

将历史记录保存下来,显示出来,主要使用dataFunc

package day08

import org.apache.spark.{HashPartitioner, SparkConf}

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object StatusKafkaWordCount {

//保持历史状态 wc 单词,次数 聚合的key

//第一个类型:单词,第二个类型:在每一个分区中出现的次数累加的结果

//第三个类型:是以前的结果

val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])]) => {

//总的次数= 当前出现的次数 + 以前返回的结果

iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))

}

def main(args: Array[String]): Unit = {

//1.创建程序入口

val conf: SparkConf = new SparkConf().setAppName(“StateKafkaWC”).setMaster(“local[2]”)

val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000))

//2.需要累加历史数据 checkpoints

ssc.checkpoint(“hdfs://192.168.64.111:9000/ck”)

//3.接入kafka数据源

val zkQuorm: String = “192.168.64.111,192.168.64.112,192.168.64.113”

//访问组

val groupID = “g1”

//访问主题

val topic: Map[String, Int] = MapString,Int

//创建Dstream

val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils

.createStream(ssc,zkQuorm,groupID,topic)

//4.处理数据

val data: DStream[String] = kafkaStream.map(_._2)

//5.加入历史数据计算

val r: DStream[(String, Int)] = data.flatMap(.split(" ")).map((, 1))

//参数1:自定义业务函数 参数2:分区器设置 参数3:是否使用

.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)

//6.打印

r.print()

//7.启动程序

ssc.start()

//8.关闭资源

ssc.awaitTermination()

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Python工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Python开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上前端开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以扫码获取!!!(备注Python)

008edf79.png)

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上前端开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以扫码获取!!!(备注Python)