导航
一个全新的 Flink Stream Api 项目,要从本地的 Kafka 中读取数据。本文给出 demo 代码。
0 前提说明
截止 2024 年 4 月 25 日,Flink 1.19 版本没有提供 Kafka connector。所以,最多只能用 Flink 1.18 版本。
1 添加依赖
在 pom.xml 中添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.1</version>
</dependency>
2 在 Kafka 中准备测试数据
创建一个名为 test 的 topic,向里面插入几条数据。
3 编写 Stream Job
demo 代码如下。
public class KafkaTestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setGroupId("my-group001")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print();
env.execute();
}
}
运行,即可看到输出。
