导航
工作需要,重拾 Flink,从最简单的 demo 开始上手。
1 创建项目
假设需要从零开发一个 Stream Api 的项目,怎样搭建项目基础框架呢?
使用 maven,执行以下命令:
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.19.0 \
    -DgroupId=com.tarsocial.demo \
    -DartifactId=flink-ad-demo \
    -Dversion=0.1 \
    -Dpackage=com.plough.demo.flink \
    -DinteractiveMode=false
2 文件解释
会生成两个代码文件。
- FraudDetectionJob,流处理任务的执行流程,入口类
 - FraudDetector,一个示例算子
 
/**
 * Skeleton code for the datastream walkthrough
 */
public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");
        alerts
            .addSink(new AlertSink())
            .name("send-alerts");
        env.execute("Fraud Detection");
    }
}
/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
    private static final long serialVersionUID = 1L;
    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;
    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {
        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());
        Thread.sleep(1000);
        collector.collect(alert);
    }
}
3 提交运行
- 执行 mvn package,打出 jar 包
 - 使用 Flink 的 WebUI,上传 jar 包,运行
 
