Flink读取数据的5种方式
- 从文件中读取数据
- 从Socket中读取数据
- 从Kafka中读取数据
- 从MySQL中读取数据
- 从自定义数据源读取数据
从文件中读取数据
这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。
程序代码:
package cn.jihui.flink import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment object readFile { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val file_name = "C:\\Users\\32985\\IdeaProjects\\flink_demo1\\resources\\wc.txt" val streamData = env.readTextFile(file_name) streamData.print env.execute("read data from file") } }
输出结果
"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61478:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readFile log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2> hello world 3> how are you 5> I am fine 7> how old are you Process finished with exit code 0
从Socket中读取数据
用于验证一些通过Socket传输数据的场景非常方便。
程序代码:
package cn.jihui.flink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object readSocket { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val ip = "172.16.3.6" val port = 9999 val streamData = env.socketTextStream(ip, port) streamData.print env.execute("read data from socket") } }
测试时,需要先在172.16.3.6的服务器上启动nc,然后再启动Flink读取数据。如
[bigdata@vm6 ~]$ nc -lk 9999 hello world how are you happy new year
在nc每输入一行数据,在Flink上均可接收到该行数据。
Flink输出内容如下:
"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61731:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readSocket log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 3> hello world 4> how are you 5> happy new year
从Kafka中读取数据
从Kafka中读取数据,可能是Flink应用最广泛的一种方式。目前我的客户也使用这种方式进行流数据的处理。
从Kafka中读取数据,可以使用flink-connector-kafka。
创建一个Maven工程,加入下面的依赖:
org.apache.flink flink-connector-kafka-0.11_2.121.10.2 程序代码:
package cn.jihui.flink import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object readKafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "172.16.3.6:9092") properties.setProperty("group.id", "data-group") val streamData = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema, properties)) streamData.print() env.execute() } }
启动Kafka
[bigdata@vm6 kafka-2.6.0]$ ./bin/kafka-server-start.sh -daemon config/server.properties
查看是否成功
[bigdata@vm6 ~]$ jps 8690 QuorumPeerMain 21068 Jps 10366 Kafka [bigdata@vm6 ~]$
启动Kafka的测试脚本
[bigdata@vm6 bin]$ ./kafka-console-producer.sh --broker-list 172.16.3.6:9092 --topic sensor >hello world >happy new year >what are you doing here? >
当在Kafka的测试脚本中输入文本时,启动Flink程序可以接收到数据,并进行输出。
输出结果
"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=60325:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo\target\classes;C:\Users\32985\.m2\repository\org\apache\flink\flink-scala_2.12\1.10.1\flink-scala_2.12-1.10.1.jar;C:\Users\32985\.m2\repository\org\apache\flink\flink-core\1.10.1\flink-core-1.10.1.jar;C:\Users\32985\.m2\repository\org\apache\flink\flink-annotations\1.10.1\flink-annotations-1.10.1.jar;C: ...... :\Users\32985\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\32985\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\32985\.m2\repository\mysql\mysql-connector-java\5.1.46\mysql-connector-java-5.1.46.jar;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar cn.jihui.flink.readKafka SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/32985/.m2/repository/org/apache/flink/flink-core/1.10.1/flink-core-1.10.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 7> hello world 7> happy new year 7> what are you doing here? Process finished with exit code -1
从MySQL中读取数据
当需要从关系数据库中读取数据时,可以简单的JDBC方式进行读取。
程序代码:
package cn.jihui.flink import java.sql.{DriverManager, ResultSet} import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ //case class SensorReading(id: String, timestamp: Long, temperature: Double) object readMySQL { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val streamData = env.addSource(new MySQLSource) streamData.print() env.execute("read data from mysql") } } class MySQLSource extends SourceFunction[SensorReading] { var running: Boolean = true; override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { val strConn = "jdbc:mysql://172.16.3.6:3306/flink" val conn = DriverManager.getConnection(strConn, "jihui", "111111") val selectStmt = conn.prepareStatement("select * from t_sensor;") val resultRs: ResultSet = selectStmt.executeQuery() while (resultRs.next()) { if (running) { val id = resultRs.getString(1) val timestamp = resultRs.getLong(2) val temperature = resultRs.getDouble(3) sourceContext.collect(SensorReading(id, timestamp, temperature)) } } resultRs.close() selectStmt.close() conn.close() } override def cancel(): Unit = { running = false } }
输出内容:
"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61886:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath ...... C:\Users\32985\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\32985\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\32985\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\32985\.m2\repository\mysql\mysql-connector-java\5.1.46\mysql-connector-java-5.1.46.jar;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar cn.jihui.flink.readMySQL SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/32985/.m2/repository/org/apache/flink/flink-core/1.10.1/flink-core-1.10.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 7> SensorReading(Sensor_4,1687944878278,42.6205) 1> SensorReading(Sensor_10,1687944879294,31.3545) 7> SensorReading(Sensor_7,1687944878278,16.3773) 1> SensorReading(Sensor_4,1687944879294,42.6205) 7> SensorReading(Sensor_8,1687944879294,7.3655) 7> SensorReading(Sensor_2,1687944878278,32.3747) 8> SensorReading(Sensor_1,1687944879294,20.0778) 8> SensorReading(Sensor_9,1687944878278,7.65736) 5> SensorReading(Sensor_3,1687944878278,32.9043) 6> SensorReading(Sensor_1,1687944880296,20.0778) 5> SensorReading(Sensor_10,1687944880296,31.3545) 2> SensorReading(Sensor_8,1687944880296,7.3655) 6> SensorReading(Sensor_6,1687944878278,4.37994) 2> SensorReading(Sensor_5,1687944880296,42.1028) 2> SensorReading(Sensor_2,1687944880296,32.3747) 4> SensorReading(Sensor_10,1687944878278,31.3545) 4> SensorReading(Sensor_9,1687944879294,7.65736) 4> SensorReading(Sensor_9,1687944880296,7.65736) 5> SensorReading(Sensor_2,1687944879294,32.3747) 5> SensorReading(Sensor_6,1687944879294,4.37994) 3> SensorReading(Sensor_5,1687944879294,42.1028) 4> SensorReading(Sensor_7,1687944879294,16.3773) 6> SensorReading(Sensor_5,1687944878278,42.1028) 8> SensorReading(Sensor_8,1687944878278,7.3655) 1> SensorReading(Sensor_4,1687944880296,42.6205) 6> SensorReading(Sensor_3,1687944879294,32.9043) 3> SensorReading(Sensor_7,1687944880296,16.3773) 8> SensorReading(Sensor_6,1687944880296,4.37994) 3> SensorReading(Sensor_3,1687944880296,32.9043) 3> SensorReading(Sensor_1,1687944878278,20.0778) Process finished with exit code 0
从自定义数据源读取数据
当需要对Flink进行一些性能测试时,可以使用自定义数据源来简化测试过程。
Flink支持自定义数据源,可以使用循环生成所需要格式的测试数据,使用方式非常灵活。
程序代码:
package cn.jihui.flink import java.util.Random import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ case class SensorReading(id: String, timestamp: Long, temperature: Double) object readCustom { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val dataCustom = env.addSource(new MySensorSource()) dataCustom.print() env.execute("custom data source") } } class MySensorSource extends SourceFunction[SensorReading] { var running: Boolean = true override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { val rand = new Random() var currTemp = 1.to(10).map(i => ("Sensor_" + i, rand.nextDouble() * 50)) while(running) { val currTime = System.currentTimeMillis() currTemp.map( data => (data._1, data._2 + rand.nextGaussian()) ) currTemp.foreach( data => sourceContext.collect(SensorReading(data._1, currTime, data._2)) ) } } override def cancel(): Unit = running = false }
运行程序后,会产生源源不断的数据。
输出内容:
"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=62739:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readCustom log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 7> SensorReading(Sensor_4,1687944878278,42.62052120270371) 1> SensorReading(Sensor_6,1687944878278,4.3799399141547735) 3> SensorReading(Sensor_8,1687944878278,7.365503979569099) 5> SensorReading(Sensor_2,1687944878278,32.374658798648426) 6> SensorReading(Sensor_3,1687944878278,32.9042516066123) 2> SensorReading(Sensor_7,1687944878278,16.37730093567341) 8> SensorReading(Sensor_5,1687944878278,42.1027668775983) 4> SensorReading(Sensor_1,1687944878278,20.077801500835886) 5> SensorReading(Sensor_10,1687944878278,31.35446091105863) 4> SensorReading(Sensor_9,1687944878278,7.657356771011931) 5> SensorReading(Sensor_8,1687944879294,7.365503979569099) 3> SensorReading(Sensor_6,1687944879294,4.3799399141547735) 6> SensorReading(Sensor_1,1687944879294,20.077801500835886) 1> SensorReading(Sensor_4,1687944879294,42.62052120270371) 4> SensorReading(Sensor_7,1687944879294,16.37730093567341) 2> SensorReading(Sensor_5,1687944879294,42.1027668775983) 6> SensorReading(Sensor_9,1687944879294,7.657356771011931) 7> SensorReading(Sensor_2,1687944879294,32.374658798648426) 8> SensorReading(Sensor_3,1687944879294,32.9042516066123) 7> SensorReading(Sensor_10,1687944879294,31.35446091105863) 4> SensorReading(Sensor_5,1687944880296,42.1027668775983) 1> SensorReading(Sensor_2,1687944880296,32.374658798648426) 5> SensorReading(Sensor_6,1687944880296,4.3799399141547735) 7> SensorReading(Sensor_8,1687944880296,7.365503979569099) 6> SensorReading(Sensor_7,1687944880296,16.37730093567341) 2> SensorReading(Sensor_3,1687944880296,32.9042516066123) 3> SensorReading(Sensor_4,1687944880296,42.62052120270371) 8> SensorReading(Sensor_1,1687944880296,20.077801500835886) 1> SensorReading(Sensor_10,1687944880296,31.35446091105863) 8> SensorReading(Sensor_9,1687944880296,7.657356771011931) Process finished with exit code -1