Flink tutor 学习小结

这两天玩了几个 Flink 的 tutor,感觉蛮好的,篇幅短小,把大数据流式/batch处理的用法都讲到了。我简单总结一下,大家感兴趣可以去深入看看:

1. First Steps

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/local_installation/

本地启动 Flink 集群,并提交一个示例任务。可以对 Flink 有个大概了解。

2. Fraud Detection with the DataStream API

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/

欺诈检测。
输入:转账记录的数据流。输出:如果检测到异常的交易记录,就报警,打印到控制台
重点:
这里面演示了 DataStream api,以及 Flink 中有状态的实时流的 best practice。

3. Real Time Reporting with the Table API

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/

实时ETL+数据可视化。输入:实时产生的转账记录。输出:一个实时更新的 dashboard。

重点:

  • 数据流架构:实时产生的记录=>Kafka=>Flink=>MySQL=>Grafana。Kafka 存放实时产生的无界数据,Flink 做 ETL,MySQL 作为数仓存储结果,Grafana 作为 BI(其 dashboard 中有简单的 SQL)
  • 里面演示的是 TableApi,我也是第一次用。对于简单的数据转换(SQL 能够表达的),写出来的代码非常干净,代码量极少。
  • 如果使用 TableApi,对于业务代码,批处理和流处理的代码是相同的。假如我们写的是流处理代码,可以在UT中用批处理来测试。

核心代码摘要

定义输入输出:
public static void main(String[] args) throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
    TableEnvironment tEnv = TableEnvironment.create(settings);

    tEnv.executeSql("CREATE TABLE transactions (\n" +
            "    account_id  BIGINT,\n" +
            "    amount      BIGINT,\n" +
            "    transaction_time TIMESTAMP(3),\n" +
            "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
            ") WITH (\n" +
            "    'connector' = 'kafka',\n" +
            "    'topic'     = 'transactions',\n" +
            "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
            "    'format'    = 'csv'\n" +
            ")");

    tEnv.executeSql("CREATE TABLE spend_report (\n" +
            "    account_id BIGINT,\n" +
            "    log_ts     TIMESTAMP(3),\n" +
            "    amount     BIGINT\n," +
            "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
            ") WITH (\n" +
            "  'connector'  = 'jdbc',\n" +
            "  'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
            "  'table-name' = 'spend_report',\n" +
            "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
            "  'username'   = 'sql-demo',\n" +
            "  'password'   = 'demo-sql'\n" +
            ")");

    Table transactions = tEnv.from("transactions");
    report(transactions).executeInsert("spend_report");
}

定义转换逻辑:

    public static Table report(Table transactions) {
//        return transactions.select(
//                $("account_id"),
////                $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
//                call(MyFloor.class, $("transaction_time")).as("log_ts"),
//                $("amount"))
//            .groupBy($("account_id"), $("log_ts"))
//            .select(
//                $("account_id"),
//                $("log_ts"),
//                $("amount").sum().as("amount"));
//

        return transactions
            .window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
            .groupBy($("account_id"), $("log_ts"))
            .select(
                $("account_id"),
                $("log_ts").start().as("log_ts"),
                $("amount").sum().as("amount"));
    }