创建并运行一个 Flink Stream Api 的 HelloWorld 项目
- Flink
- 2024-04-24
- 725热度
- 0评论
导航
工作需要,重拾 Flink,从最简单的 demo 开始上手。
1 创建项目
假设需要从零开发一个 Stream Api 的项目,怎样搭建项目基础框架呢?
使用 maven,执行以下命令:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-walkthrough-datastream-java \ -DarchetypeVersion=1.19.0 \ -DgroupId=com.tarsocial.demo \ -DartifactId=flink-ad-demo \ -Dversion=0.1 \ -Dpackage=com.plough.demo.flink \ -DinteractiveMode=false
2 文件解释
会生成两个代码文件。
- FraudDetectionJob,流处理任务的执行流程,入口类
- FraudDetector,一个示例算子
/** * Skeleton code for the datastream walkthrough */ public class FraudDetectionJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions"); DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector"); alerts .addSink(new AlertSink()) .name("send-alerts"); env.execute("Fraud Detection"); } }
/** * Skeleton code for implementing a fraud detector. */ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private static final long serialVersionUID = 1L; private static final double SMALL_AMOUNT = 1.00; private static final double LARGE_AMOUNT = 500.00; private static final long ONE_MINUTE = 60 * 1000; @Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { Alert alert = new Alert(); alert.setId(transaction.getAccountId()); Thread.sleep(1000); collector.collect(alert); } }
3 提交运行
- 执行 mvn package,打出 jar 包
- 使用 Flink 的 WebUI,上传 jar 包,运行