使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。

1、环境设置:

首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

 4.0.0 org.example spark_project 1.0-SNAPSHOT  8 8 UTF-8 2.12.12 3.2.0 2.8.1     org.apache.spark spark-core_2.12 ${spark.version}   org.apache.spark spark-sql_2.12 ${spark.version}   com.alibaba fastjson 1.2.76   org.apache.spark spark-sql-kafka-0-10_2.12 ${spark.version}   org.apache.spark spark-streaming_2.12 ${spark.version}   org.apache.spark spark-streaming-kafka-0-10_2.12 ${spark.version}    org.apache.kafka kafka-clients ${kafka.version}   mysql mysql-connector-java 8.0.28    org.scala-lang scala-library ${scala.version}  

mysql中表结构:

2、从MySQL读取数据到Kafka:

我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfs
import org.apache.spark.sql.SparkSession
import java.util.Properties
object Mysql2Kafka { def main(args: Array[String]): Unit = { // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("MySQLToKafka")
      .master("local[*]")
      .getOrCreate()
    // 设置 MySQL 连接属性
    val mysqlProps = new Properties()
    mysqlProps.setProperty("user", "root")
    mysqlProps.setProperty("password", "12345678")
    mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")
    // 从 MySQL 数据库中读取数据
    val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)
    // 将 DataFrame 转换为 JSON 字符串
    val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")
    // 将数据写入 Kafka
    jsonDF.show()
    jsonDF
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "comment")
      .save()
    // 停止 SparkSession
    spark.stop()
  }
}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:

接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfs
import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
case class Comment(author_name:String,
                   fans:String,
                   comment_text:String,
                   comment_time:String,
                   location:String,
                   user_gender:String)
object kafka2Hdfs { def main(args: Array[String]): Unit = { // 设置 SparkConf
    val sparkConf = new SparkConf()
      .setAppName("KafkaToHDFS")
      .setMaster("local[*]")
    // 创建 StreamingContext,每秒处理一次
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    // 设置 Kafka 相关参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092", // Kafka broker 地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "spark-consumer-group", // Spark 消费者组
      "auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量
    )
    // 设置要订阅的 Kafka 主题
    val topics = Array("comment")
    // 创建 Kafka Direct Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    // 从 Kafka 中读取消息,然后将其写入 HDFS
    stream.map({rdd=> val comment = JSON.parseObject(rdd.toString(), classOf[Comment])
      comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender
    }).foreachRDD { rdd => if (!rdd.isEmpty()) { println(rdd)
        rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")
      }
    }
    // 启动 Spark Streaming
    ssc.start()
    ssc.awaitTermination()
  }
}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。

结论:

通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **