Flink 集成 KafkaSource
- Flink
- 2024-04-25
- 921热度
- 0评论
导航
一个全新的 Flink Stream Api 项目,要从本地的 Kafka 中读取数据。本文给出 demo 代码。
0 前提说明
截止 2024 年 4 月 25 日,Flink 1.19 版本没有提供 Kafka connector。所以,最多只能用 Flink 1.18 版本。
1 添加依赖
在 pom.xml 中添加如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>3.1.0-1.18</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>1.18.1</version> </dependency>
2 在 Kafka 中准备测试数据
创建一个名为 test 的 topic,向里面插入几条数据。
3 编写 Stream Job
demo 代码如下。
public class KafkaTestJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("localhost:9092") .setTopics("test") .setGroupId("my-group001") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print(); env.execute(); } }
运行,即可看到输出。