导航
工作需要,重拾 Flink,从最简单的 demo 开始上手。
1 创建项目
假设需要从零开发一个 Stream Api 的项目,怎样搭建项目基础框架呢?
使用 maven,执行以下命令:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.19.0 \
-DgroupId=com.tarsocial.demo \
-DartifactId=flink-ad-demo \
-Dversion=0.1 \
-Dpackage=com.plough.demo.flink \
-DinteractiveMode=false
2 文件解释
会生成两个代码文件。
- FraudDetectionJob,流处理任务的执行流程,入口类
- FraudDetector,一个示例算子
/**
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
Thread.sleep(1000);
collector.collect(alert);
}
}
3 提交运行
- 执行 mvn package,打出 jar 包
- 使用 Flink 的 WebUI,上传 jar 包,运行
