Flink 集成 KafkaSource

一个全新的 Flink Stream Api 项目,要从本地的 Kafka 中读取数据。本文给出 demo 代码。

0 前提说明

截止 2024 年 4 月 25 日,Flink 1.19 版本没有提供 Kafka connector。所以,最多只能用 Flink 1.18 版本。

1 添加依赖

在 pom.xml 中添加如下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>3.1.0-1.18</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.1</version>
</dependency>

2 在 Kafka 中准备测试数据

创建一个名为 test 的 topic,向里面插入几条数据。

具体可见 https://baimoz.me/2679/

3 编写 Stream Job

demo 代码如下。

public class KafkaTestJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("test")
                .setGroupId("my-group001")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print();
        env.execute();
    }
}

运行,即可看到输出。

4 参考文档