【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

  • 1)连接到外部系统
  • 2)输出到文件
  • 3)输出到 Kafka
  • 4)输出到 MySQL(JDBC)
  • 5)自定义 Sink 输出

    Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

    1)连接到外部系统

    Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

    Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

    stream.addSink(new SinkFunction(…));
    

    addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

    Flink1.12 开始,同样重构了 Sink 架构,

    stream.sinkTo(…)
    

    当然,Sink 多数情况下同样并不需要我们自己实现。之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。Flink 官方为我们提供了一部分的框架的 Sink 连接器。如下图所示,列出了 Flink 官方目前支持的第三方系统连接器:

    我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source / sink 两端都能连接,可读可写;而对于 Elasticsearch、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

    除 Flink 官方之外,Apache Bahir 框架,也实现了一些其他第三方系统与 Flink 的连接器。

    除此以外,就需要用户自定义实现 sink 连接器了。

    2)输出到文件

    Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。

    FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

    • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
    • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

      示例:

      public class SinkFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // TODO 每个目录中,都有 并行度个数的 文件在写入
              env.setParallelism(2);
              // 必须开启checkpoint,否则一直都是 .inprogress
              env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
              DataGeneratorSource dataGeneratorSource = new DataGeneratorSource<>(
                      new GeneratorFunction() { @Override
                          public String map(Long value) throws Exception { return "Number:" + value;
                          }
                      },
                      Long.MAX_VALUE,
                      RateLimiterStrategy.perSecond(1000),
                      Types.STRING
              );
              DataStreamSource dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
              // TODO 输出到文件系统
              FileSink fieSink = FileSink
                      // 输出行式存储的文件,指定路径、指定编码
                      .forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                      // 输出文件的一些配置: 文件名的前缀、后缀
                      .withOutputFileConfig(
                              OutputFileConfig.builder()
                                      .withPartPrefix("atguigu-")
                                      .withPartSuffix(".log")
                                      .build()
                      )
                      // 按照目录分桶:如下,就是每个小时一个目录
                      .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                      // 文件滚动策略:  1分钟 或 1m
                      .withRollingPolicy(
                              DefaultRollingPolicy.builder()
                                      .withRolloverInterval(Duration.ofMinutes(1))
                                      .withMaxPartSize(new MemorySize(1024*1024))
                                      .build()
                      )
                      .build();
              dataGen.sinkTo(fieSink);
              env.execute();
          }
      }
      

      3)输出到 Kafka

      1、添加 Kafka 连接器依赖。

      由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

      2、启动 Kafka 集群。

      3、编写输出到 Kafka 的示例代码。

      (1)输出无 key 的 record:

      public class SinkKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              // 如果是精准一次,必须开启checkpoint(后续章节介绍)
              env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
              SingleOutputStreamOperator sensorDS = env
                      .socketTextStream("hadoop102", 7777);
              /**
               * Kafka Sink:
               * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
               * 1、开启checkpoint(后续介绍)
               * 2、设置事务前缀
               * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间 < max的15分钟
               */
              KafkaSink kafkaSink = KafkaSink.builder()
                      // 指定 kafka 的地址和端口
                      .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                      // 指定序列化器:指定Topic名称、具体的序列化
                      .setRecordSerializer(
                              KafkaRecordSerializationSchema.builder()
                                      .setTopic("ws")
                                      .setValueSerializationSchema(new SimpleStringSchema())
                                      .build()
                      )
                      // 写到kafka的一致性级别: 精准一次、至少一次
                      .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                      // 如果是精准一次,必须设置 事务的前缀
                      .setTransactionalIdPrefix("atguigu-")
                      // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                      .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                      .build();
              sensorDS.sinkTo(kafkaSink);
              env.execute();
          }
      }
      

      (2)自定义序列化器,实现带 key 的 record:

      public class SinkKafkaWithKey { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
              env.setRestartStrategy(RestartStrategies.noRestart());
              SingleOutputStreamOperator sensorDS = env
                      .socketTextStream("hadoop102", 7777);
              /**
               * 如果要指定写入kafka的key
               * 可以自定义序列器:
               * 1、实现 一个接口,重写 序列化 方法
               * 2、指定key,转成 字节数组
               * 3、指定value,转成 字节数组
               * 4、返回一个 ProducerRecord对象,把key、value放进去
               *
               */
              KafkaSink kafkaSink = KafkaSink.builder()
                      .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                      .setRecordSerializer(
                              new KafkaRecordSerializationSchema() { @Nullable
                                  @Override
                                  public ProducerRecord serialize(String element, KafkaSinkContext context, Long timestamp) { String[] datas = element.split(",");
                                      byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                      byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                      return new ProducerRecord<>("ws", key, value);
                                  }
                              }
                      )
                      .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                      .setTransactionalIdPrefix("atguigu-")
                      .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                      .build();
              sensorDS.sinkTo(kafkaSink);
              env.execute();
          }
      }
      

      4、运行代码,在 Linux 主机启动一个消费者,查看是否收到数据。

      [hadoop102 ~]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
      

      4)输出到 MySQL(JDBC)

      写入数据的 MySQL 的测试步骤如下。

      1、添加依赖。

      添加 MySQL 驱动:

      mysqlmysql-connector-java8.0.27

      官方还未提供 flink-connector-jdbc 的 1.17.0 的正式依赖,暂时从 apache snapshot 仓库下载,pom 文件中指定仓库路径:

      apache-snapshotsapache snapshotshttps://repository.apache.org/content/repositories/snapshots/

      添加依赖:

      org.apache.flinkflink-connector-jdbc1.17-SNAPSHOT

      如果不生效,还需要修改本地 maven 的配置文件,mirrorOf 中添加如下标红内容:

      aliyunmaven*,!apache-snapshots阿里云公共仓库https://maven.aliyun.com/repository/public

      2、启动 MySQL,在 test 库下建表 ws。

      mysql>CREATE TABLE `ws` (
      `id` varchar(100) NOT NULL,
      `ts` bigint(20) DEFAULT NULL,
      `vc` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8
      

      3、编写输出到 MySQL 的示例代码。

      public class SinkMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              SingleOutputStreamOperator sensorDS = env
                      .socketTextStream("hadoop102", 7777)
                      .map(new WaterSensorMapFunction());
              /**
               * TODO 写入mysql
               * 1、只能用老的sink写法: addsink
               * 2、JDBCSink的4个参数:
               *    第一个参数: 执行的sql,一般就是 insert into
               *    第二个参数: 预编译sql, 对占位符填充值
               *    第三个参数: 执行选项 ---》 攒批、重试
               *    第四个参数: 连接选项 ---》 url、用户名、密码
               */
              SinkFunction jdbcSink = JdbcSink.sink(
                      "insert into ws values(?,?,?)",
                      new JdbcStatementBuilder() { @Override
                          public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException { //每收到一条WaterSensor,如何去填充占位符
                              preparedStatement.setString(1, waterSensor.getId());
                              preparedStatement.setLong(2, waterSensor.getTs());
                              preparedStatement.setInt(3, waterSensor.getVc());
                          }
                      },
                      JdbcExecutionOptions.builder()
                              .withMaxRetries(3) // 重试次数
                              .withBatchSize(100) // 批次的大小:条数
                              .withBatchIntervalMs(3000) // 批次的时间
                              .build(),
                      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                              .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                              .withUsername("root")
                              .withPassword("000000")
                              .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                              .build()
              );
              sensorDS.addSink(jdbcSink);
              env.execute();
          }
      }
      

      4、运行代码,用客户端连接 MySQL,查看是否成功写入数据。

      5)自定义 Sink 输出

      如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,就只能自定义 Sink 进行输出了。与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的 .addSink() 方法就可以自定义写入任何外部存储。

      stream.addSink(new MySinkFunction());
      

      在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

      这种方式比较通用,对于任何外部存储系统都有效;不过自定义 Sink 想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器 Flink 官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。