记一次 Flink Job 调优经验

问题描述

素材计算的 Job,涉及对 SR 的高频复杂查询,核心算子开了 60 个并行度,window 大小为 15s,吞吐量不够大(跟不上上游的生产速度),容易出现 SQL 异常导致 Job 重启。

优化之后,仅用 24 个并行度,跑出了之前 60 个并行度 3 倍以上的吞吐量,而且系统不再重启。

优化思路

  • 主要是对一张大表(素材表)用 IN 语句查询,需要限制单次查询的 ID 个数。
  • 对于复杂查询来说,如果 SR 的并发过大(例如超过 30-50 个),会导致部分查询直接失败。因此,要降低 SR 的查询并发,不能开 60 个并行度。
  • 既然并发数调低了,要相应提高单次查询的量,以提高整体吞吐量。一方面,是调大单批次的查询上限;另一方面,是加大 window 的时间,以便触发查询时有足够多的待处理数据。
  • 还是可能出现 SQL 异常,但不是必现的,一般重试能够成功。于是,增加失败重试机制。

具体细节

分批、失败重试

private Map<String, DwsMaterialInfo> getMaterialInfo(Set<String> materialIds) throws SQLException {
    // 将 materialIds 转为 List
    List<String> materialIdList = new ArrayList<>(materialIds);

    // 使用 Stream API 对 materialIds 分组处理
    return IntStream.range(0, materialIdList.size())
            .boxed()
            .collect(Collectors.groupingBy(i -> i / BATCH_LIMIT)) // 按照批次分组
            .values().stream()  // 获取分组后的子列表集合
            .map(group -> group.stream().map(materialIdList::get).collect(Collectors.toList())) // 将索引转换为实际的 materialId 列表
            .map(batchMaterialIds -> {
                int attempt = 0;
                while (true) {
                    try {
                        log.info("Attempting to get material info, attempt: {}", attempt + 1);
                        return doGetMaterialInfo(batchMaterialIds);
                    } catch (SQLException e) {
                        attempt++;
                        log.error("SQLException occurred on attempt {}, retrying...", attempt, e);
                        if (attempt >= MAX_RETRIES) {
                            log.error("Max retry attempts reached. Failing with error.", e);
                            throw new RuntimeException("Failed to get material info after " + MAX_RETRIES + " attempts", e);
                        }
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException ie) {
                            throw new RuntimeException("Thread was interrupted", ie);
                        }
                    }
                }
            }) // 对每个分组批次调用 doGetMaterialInfo
            .flatMap(batchRes -> batchRes.entrySet().stream()) // 扁平化 Map<Entry> 流
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // 收集为最终的 Map
}

Window 大小调整为 120s

并发数调为 24(用二分法,实际测试出来的值)

心得体会

  • 并发数不是“银弹”,并非越大越好。对于 SR 来说,并发数超过某个阈值后,会导致查询失败。(任何一个系统都应该存在这样的阈值)
  • 适当提高 batch 的大小,来增加吞吐量。kafka 的优化也是如此,增加单次拉取的数据量,可以显著提高吞吐。(《辛雷学习方法》中,提到的“每次最长学习时间”,就是这个意思。真正的学霸,batch 都可以很大)
  • lambda 表达式要多用。好用。