这两天玩了几个 Flink 的 tutor,感觉蛮好的,篇幅短小,把大数据流式/batch处理的用法都讲到了。我简单总结一下,大家感兴趣可以去深入看看:
1. First Steps
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/local_installation/
本地启动 Flink 集群,并提交一个示例任务。可以对 Flink 有个大概了解。
2. Fraud Detection with the DataStream API
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/
欺诈检测。
输入:转账记录的数据流。输出:如果检测到异常的交易记录,就报警,打印到控制台
重点:
这里面演示了 DataStream api,以及 Flink 中有状态的实时流的 best practice。
3. Real Time Reporting with the Table API
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/
实时ETL+数据可视化。输入:实时产生的转账记录。输出:一个实时更新的 dashboard。
重点:
- 数据流架构:实时产生的记录=>Kafka=>Flink=>MySQL=>Grafana。Kafka 存放实时产生的无界数据,Flink 做 ETL,MySQL 作为数仓存储结果,Grafana 作为 BI(其 dashboard 中有简单的 SQL)
- 里面演示的是 TableApi,我也是第一次用。对于简单的数据转换(SQL 能够表达的),写出来的代码非常干净,代码量极少。
- 如果使用 TableApi,对于业务代码,批处理和流处理的代码是相同的。假如我们写的是流处理代码,可以在UT中用批处理来测试。
核心代码摘要
定义输入输出:
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'format' = 'csv'\n" +
")");
tEnv.executeSql("CREATE TABLE spend_report (\n" +
" account_id BIGINT,\n" +
" log_ts TIMESTAMP(3),\n" +
" amount BIGINT\n," +
" PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
" 'table-name' = 'spend_report',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'sql-demo',\n" +
" 'password' = 'demo-sql'\n" +
")");
Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");
}
定义转换逻辑:
public static Table report(Table transactions) {
// return transactions.select(
// $("account_id"),
//// $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
// call(MyFloor.class, $("transaction_time")).as("log_ts"),
// $("amount"))
// .groupBy($("account_id"), $("log_ts"))
// .select(
// $("account_id"),
// $("log_ts"),
// $("amount").sum().as("amount"));
//
return transactions
.window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
.groupBy($("account_id"), $("log_ts"))
.select(
$("account_id"),
$("log_ts").start().as("log_ts"),
$("amount").sum().as("amount"));
}
