Flink tutor 学习小结
- Flink
- 2021-12-17
- 552热度
- 0评论
这两天玩了几个 Flink 的 tutor,感觉蛮好的,篇幅短小,把大数据流式/batch处理的用法都讲到了。我简单总结一下,大家感兴趣可以去深入看看:
1. First Steps
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/local_installation/
本地启动 Flink 集群,并提交一个示例任务。可以对 Flink 有个大概了解。
2. Fraud Detection with the DataStream API
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/
欺诈检测。
输入:转账记录的数据流。输出:如果检测到异常的交易记录,就报警,打印到控制台
重点:
这里面演示了 DataStream api,以及 Flink 中有状态的实时流的 best practice。
3. Real Time Reporting with the Table API
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/
实时ETL+数据可视化。输入:实时产生的转账记录。输出:一个实时更新的 dashboard。
重点:
- 数据流架构:实时产生的记录=>Kafka=>Flink=>MySQL=>Grafana。Kafka 存放实时产生的无界数据,Flink 做 ETL,MySQL 作为数仓存储结果,Grafana 作为 BI(其 dashboard 中有简单的 SQL)
- 里面演示的是 TableApi,我也是第一次用。对于简单的数据转换(SQL 能够表达的),写出来的代码非常干净,代码量极少。
- 如果使用 TableApi,对于业务代码,批处理和流处理的代码是相同的。假如我们写的是流处理代码,可以在UT中用批处理来测试。
核心代码摘要
定义输入输出:
public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE transactions (\n" + " account_id BIGINT,\n" + " amount BIGINT,\n" + " transaction_time TIMESTAMP(3),\n" + " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'transactions',\n" + " 'properties.bootstrap.servers' = 'kafka:9092',\n" + " 'format' = 'csv'\n" + ")"); tEnv.executeSql("CREATE TABLE spend_report (\n" + " account_id BIGINT,\n" + " log_ts TIMESTAMP(3),\n" + " amount BIGINT\n," + " PRIMARY KEY (account_id, log_ts) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" + " 'table-name' = 'spend_report',\n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'sql-demo',\n" + " 'password' = 'demo-sql'\n" + ")"); Table transactions = tEnv.from("transactions"); report(transactions).executeInsert("spend_report"); }
定义转换逻辑:
public static Table report(Table transactions) { // return transactions.select( // $("account_id"), //// $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"), // call(MyFloor.class, $("transaction_time")).as("log_ts"), // $("amount")) // .groupBy($("account_id"), $("log_ts")) // .select( // $("account_id"), // $("log_ts"), // $("amount").sum().as("amount")); // return transactions .window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts")) .groupBy($("account_id"), $("log_ts")) .select( $("account_id"), $("log_ts").start().as("log_ts"), $("amount").sum().as("amount")); }