记一次 Flink Job 调优经验
- Flink
- 2024-09-15
- 178热度
- 0评论
导航
问题描述
素材计算的 Job,涉及对 SR 的高频复杂查询,核心算子开了 60 个并行度,window 大小为 15s,吞吐量不够大(跟不上上游的生产速度),容易出现 SQL 异常导致 Job 重启。
优化之后,仅用 24 个并行度,跑出了之前 60 个并行度 3 倍以上的吞吐量,而且系统不再重启。
优化思路
- 主要是对一张大表(素材表)用 IN 语句查询,需要限制单次查询的 ID 个数。
- 对于复杂查询来说,如果 SR 的并发过大(例如超过 30-50 个),会导致部分查询直接失败。因此,要降低 SR 的查询并发,不能开 60 个并行度。
- 既然并发数调低了,要相应提高单次查询的量,以提高整体吞吐量。一方面,是调大单批次的查询上限;另一方面,是加大 window 的时间,以便触发查询时有足够多的待处理数据。
- 还是可能出现 SQL 异常,但不是必现的,一般重试能够成功。于是,增加失败重试机制。
具体细节
分批、失败重试
private Map<String, DwsMaterialInfo> getMaterialInfo(Set<String> materialIds) throws SQLException { // 将 materialIds 转为 List List<String> materialIdList = new ArrayList<>(materialIds); // 使用 Stream API 对 materialIds 分组处理 return IntStream.range(0, materialIdList.size()) .boxed() .collect(Collectors.groupingBy(i -> i / BATCH_LIMIT)) // 按照批次分组 .values().stream() // 获取分组后的子列表集合 .map(group -> group.stream().map(materialIdList::get).collect(Collectors.toList())) // 将索引转换为实际的 materialId 列表 .map(batchMaterialIds -> { int attempt = 0; while (true) { try { log.info("Attempting to get material info, attempt: {}", attempt + 1); return doGetMaterialInfo(batchMaterialIds); } catch (SQLException e) { attempt++; log.error("SQLException occurred on attempt {}, retrying...", attempt, e); if (attempt >= MAX_RETRIES) { log.error("Max retry attempts reached. Failing with error.", e); throw new RuntimeException("Failed to get material info after " + MAX_RETRIES + " attempts", e); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ie) { throw new RuntimeException("Thread was interrupted", ie); } } } }) // 对每个分组批次调用 doGetMaterialInfo .flatMap(batchRes -> batchRes.entrySet().stream()) // 扁平化 Map<Entry> 流 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // 收集为最终的 Map }
Window 大小调整为 120s
并发数调为 24(用二分法,实际测试出来的值)
心得体会
- 并发数不是“银弹”,并非越大越好。对于 SR 来说,并发数超过某个阈值后,会导致查询失败。(任何一个系统都应该存在这样的阈值)
- 适当提高 batch 的大小,来增加吞吐量。kafka 的优化也是如此,增加单次拉取的数据量,可以显著提高吞吐。(《辛雷学习方法》中,提到的“每次最长学习时间”,就是这个意思。真正的学霸,batch 都可以很大)
- lambda 表达式要多用。好用。