创建并运行一个 Flink Stream Api 的 HelloWorld 项目

工作需要,重拾 Flink,从最简单的 demo 开始上手。

1 创建项目

假设需要从零开发一个 Stream Api 的项目,怎样搭建项目基础框架呢?

使用 maven,执行以下命令:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.19.0 \
    -DgroupId=com.tarsocial.demo \
    -DartifactId=flink-ad-demo \
    -Dversion=0.1 \
    -Dpackage=com.plough.demo.flink \
    -DinteractiveMode=false

2 文件解释

会生成两个代码文件。

  • FraudDetectionJob,流处理任务的执行流程,入口类
  • FraudDetector,一个示例算子
/**
 * Skeleton code for the datastream walkthrough
 */
public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");

        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");

        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}
/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());
        Thread.sleep(1000);

        collector.collect(alert);
    }
}

3 提交运行

  1. 执行 mvn package,打出 jar 包
  2. 使用 Flink 的 WebUI,上传 jar 包,运行

4 参考资料