大家好,欢迎来到IT知识分享网。
大数据之Flink优化总结2
第3章 反压处理
概述
Flink网络流控及反压的介绍:
https://flink-learning.org.cn/article/detail/138316d1556f8f9d34e517d04d670626
反压的理解
简单来说,Flink 拓扑中每个节点(Task)间的数据都以阻塞队列的方式传输,下游来不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞。
反压(BackPressure)通常产生于这样的场景:短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增、数据倾斜。
反压的危害
反压如果不能得到正确的处理,可能会影响到checkpoint 时长和 state 大小,甚至可能会导致资源耗尽甚至系统崩溃。
1)影响checkpoint时长:barrier 不会越过普通数据,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,导致 checkpoint 总体时间(End to End Duration)变长。
2)影响state大小:barrier对齐时,接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。
因此,我们在生产中要尽量避免出现反压的情况。
定位反压节点
解决反压首先要做的是定位到造成反压的节点,排查的时候,先把operator chain禁用,方便定位到具体算子,开启disableOperatorChaining。
提交UvDemo:
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
利用 Flink Web UI 定位
Flink Web UI 的反压监控提供了 SubTask 级别的反压监控,1.13版本以前是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在 0.1 以下则为 OK(绿底),0.1 至 0.5 为 LOW(黄底),而超过 0.5 则为 HIGH(红底)。
Flink 1.13 优化了反压检测的逻辑(使用基于任务 Mailbox 计时,而不在再于堆栈采样),并且重新实现了作业图的 UI 展示:Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。(禁用算子链后,单个算子则为一个框图显示)。
1)通过WebUI看到Map算子处于反压:
3)分析瓶颈算子
如果处于反压状态,那么有两种可能性:
(1)该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。
(2)下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况,需要继续排查下游节点,一直找到第一个为OK的一般就是根源节点。
总体来看,如果我们找到第一个出现反压的节点,反压根源要么是就这个节点,要么是它紧接着的下游节点。
通常来讲,第二种情况更常见。如果无法确定,还需要结合 Metrics进一步判断。
第一个为OK的反压:
第一个source的反压:
利用Metrics定位
监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个 Metrics:
Metris |
描述 |
outPoolUsage |
发送端 Buffer 的使用率 |
inPoolUsage |
接收端 Buffer 的使用率 |
floatingBuffersUsage(1.9 以上) |
接收端 Floating Buffer 的使用率 |
exclusiveBuffersUsage(1.9 以上) |
接收端 Exclusive Buffer 的使用率 |
其中 inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。
1)根据指标分析反压
分析反压的大致思路是:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。反压情况可以根据以下表格进行对号入座(1.9以上):
outPoolUsage低 |
outPoolUsage高 |
|
inPoolUsage低 |
正常 |
被下游反压,处于临时情况 (还没传递到上游) |
可能是反压的根源,一条输入多条输出的场景 |
||
inPoolUsage高 |
如果上游所有outPoolUsage都是低,有可能最终可能导致反压(还没传递到上游) |
被下游反压 |
如果上游的outPoolUsage是高,则为反压根源 |
2)可以进一步分析数据传输
Flink 1.9及以上版本,还可以根据 floatingBuffersUsage/exclusiveBuffersUsage 以及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游 Subtask 的数据传输。
在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer。
exclusiveBuffersUsage高 |
||
floatingBuffersUsage 低 所有上游outPoolUsage 低 |
正常 |
|
floatingBuffersUsage 低 上游某个outPoolUsage 高 |
潜在的网络瓶颈 |
|
floatingBuffersUsage 高 所有上游outPoolUsage 低 |
最终对部分inputChannel反压(正在传递) |
最终对大多数或所有inputChannel反压(正在传递) |
floatingBuffersUsage 高 上游某个outPoolUsage 高 |
只对部分inputChannel反压 |
对大多数或所有inputChannel反压 |
总结:
1)floatingBuffersUsage 为高,则表明反压正在传导至上游
2)同时exclusiveBuffersUsage为低,则表明可能有倾斜
比如,floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer。
反压的原因及处理
注意:反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。如果反压是暂时的,应该忽略它。另外,请记住,断断续续的反压会影响我们分析和解决问题。
定位到反压节点后,分析造成原因的办法主要是观察 Task Thread。按照下面的顺序,一步一步去排查。1.数据倾斜,2.资源,3.频繁GC(老年代过多,full GC),4.代码。
查看是否数据倾斜
在实践中,很多情况下的反压是由于数据倾斜造成的,这点我们可以通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。
(关于数据倾斜的详细解决方案,会在下一章节详细讨论)
使用火焰图分析
如果不是数据倾斜,最常见的问题可能是用户代码的执行效率问题(频繁被阻塞或者性能问题),需要找到瓶颈算子中的哪部分计算逻辑消耗巨大。
最有用的办法就是对 TaskManager 进行 CPU profile,从中我们可以分析到 Task Thread 是否跑满一个 CPU 核:如果是的话要分析 CPU 主要花费在哪些函数里面;如果不是的话要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动导致的暂时系统暂停。
1)开启火焰图功能
Flink 1.13直接在 WebUI 提供 JVM 的 CPU 火焰图,这将大大简化性能瓶颈的分析,默认是不开启的,需要修改参数:
rest.flamegraph.enabled: true #默认false
也可以在提交时指定:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Drest.flamegraph.enabled=true \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
2)WebUI查看火焰图
火焰图是通过对堆栈跟踪进行多次采样来构建的。每个方法调用都由一个条形表示,其中条形的长度与其在样本中出现的次数成正比。
- On-CPU: 处于 [RUNNABLE, NEW]状态的线程
- Off-CPU: 处于 [TIMED_WAITING, WAITING, BLOCKED]的线程,用于查看在样本中发现的等待中的阻塞调用。
3)分析火焰图
颜色没有特殊含义,具体查看:
-
-
- 纵向是调用链,从下往上,顶部就是正在执行的函数
- 横向是样本出现次数,可以理解为执行时长。
-
看顶层的哪个函数占据的宽度最大。只要有”平顶”(plateaus),就表示该函数可能存在性能问题。
如果是Flink 1.13以前的版本,可以手动做火焰图:
如何生成火焰图:http://www.54tianzhisheng.cn/2020/10/05/flink-jvm-profiler/
分析GC情况
TaskManager 的内存以及 GC 问题也可能会导致反压,包括 TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。通常建议使用默认的G1垃圾回收器。
可以通过打印GC日志(-XX:+PrintGCDetails),使用GC 分析器(GCViewer工具)来验证是否处于这种情况。
- 在Flink提交脚本中,设置JVM参数,打印GC日志:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Denv.java.opts=”-XX:+PrintGCDetails -XX:+PrintGCDateStamps” \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
- 下载GC日志的方式:
因为是on yarn模式,运行的节点一个一个找比较麻烦。可以打开WebUI,选择JobManager或者TaskManager,点击Stdout,即可看到GC日志,点击下载按钮即可将GC日志通过HTTP的方式下载下来。
- 分析GC日志:
通过 GC 日志分析出单个 Flink Taskmanager 堆总大小、年轻代、老年代分配的内存空间、Full GC 后老年代剩余大小等,相关指标定义可以去 Github 具体查看。
GCViewer地址:https://github.com/chewiebug/GCViewer
Linux下分析:
java -jar gcviewer_1.3.4.jar gc.log
Windows下分析:
直接双击gcviewer_1.3.4.jar,打开GUI界面,选择gc的log打开
扩展:最重要的指标是Full GC 后,老年代剩余大小这个指标,按照《Java 性能优化权威指南》这本书 Java 堆大小计算法则,设 Full GC 后老年代剩余大小空间为 M,那么堆的大小建议 3 ~ 4倍 M,新生代为 1 ~ 1.5 倍 M,老年代应为 2 ~ 3 倍 M。
外部组件交互
如果发现我们的 Source 端数据读取性能比较低或者 Sink 端写入性能较差,需要检查第三方组件是否遇到瓶颈,还有就是做维表join时的性能问题。
例如:
Kafka 集群是否需要扩容,Kafka 连接器是否并行度较低
HBase 的 rowkey 是否遇到热点问题,是否请求处理不过来
ClickHouse并发能力较弱,是否达到瓶颈
……
关于第三方组件的性能问题,需要结合具体的组件来分析,最常用的思路:
1)异步io+热缓存来优化读写性能
2)先攒批再读写
维表join参考:
https://flink-learning.org.cn/article/detail/b8df32fbc6542257a5b449114e137cc3
https://www.jianshu.com/p/a62fa483ff54
数据倾斜
判断是否存在数据倾斜
相同 Task 的多个 Subtask 中,个别Subtask 接收到的数据量明显大于其他 Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。
另外, 有时Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。
数据倾斜的解决
keyBy 后的聚合操作存在数据倾斜
提交案例:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo1 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
–local-keyby false
查看webui:
1)为什么不能直接用二次聚合来处理
Flink是实时流处理,如果keyby之后的聚合操作存在数据倾斜,且没有开窗口(没攒批)的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候Flink是来一条处理一条,且向下游发送一条结果,对于原来keyby的维度(第二阶段聚合)来讲,数据量并没有减少,且结果重复计算(非FlinkSQL,未使用回撤流),如下图所示:
2)使用LocalKeyBy的思想
在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
实现方式:
- DataStreamAPI需要自己写代码实现
- SQL可以指定参数,开启miniBatch和LocalGlobal功能(推荐,后续介绍)
3)DataStream API自定义实现的案例
以计算每个mid出现的次数为例,keyby之前,使用flatMap实现LocalKeyby功能
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class LocalKeyByFlatMapFunc extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> implements CheckpointedFunction {
//Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
private ListState<Tuple2<String, Long>> listState;
//本地 buffer,存放 local 端缓存的 mid 的 count 信息
private HashMap<String, Long> localBuffer;
//缓存的数据量大小,即:缓存多少数据再向下游发送
private int batchSize;
//计数器,获取当前批次接收的数据量
private AtomicInteger currentSize;
//构造器,批次大小传参
public LocalKeyByFlatMapFunc(int batchSize) {
this.batchSize = batchSize;
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, Long>> out) throws Exception {
// 1、将新来的数据添加到 buffer 中,本地聚合
Long count = localBuffer.getOrDefault(value.f0, 0L);
localBuffer.put(value.f0, count + 1);
// 2、如果到达设定的批次,则将 buffer 中的数据发送到下游
if (currentSize.incrementAndGet() >= batchSize) {
// 2.1 遍历 Buffer 中数据,发送到下游
for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
out.collect(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));
}
// 2.2 Buffer 清空,计数器清零
localBuffer.clear();
currentSize.set(0);
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
listState.clear();
for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
listState.add(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 从状态中恢复 buffer 中的数据
listState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<Tuple2<String, Long>>(
“localBufferState”,
Types.TUPLE(Types.STRING, Types.LONG)
)
);
localBuffer = new HashMap();
if (context.isRestored()) {
// 从状态中恢复数据到 buffer 中
for (Tuple2<String, Long> midAndCount : listState.get()) {
// 如果出现 pv != 0,说明改变了并行度,ListState 中的数据会被均匀分发到新的 subtask中
// 单个 subtask 恢复的状态中可能包含多个相同的 mid 的 count数据
// 所以每次先取一下buffer的值,累加再put
long count = localBuffer.getOrDefault(midAndCount.f0, 0L);
localBuffer.put(midAndCount.f0, count + midAndCount.f1);
}
// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
currentSize = new AtomicInteger(batchSize);
} else {
currentSize = new AtomicInteger(0);
}
}
}
提交localkeyby案例:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo1 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
–local-keyby true
查看webui:
可以看到每个subtask处理的数据量基本均衡,另外处理的数据量相比原先少了很多。
keyBy 之前发生数据倾斜
如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少。对于不存在 keyBy 的 Flink 任务也会出现该情况。
这种情况,需要让 Flink 任务强制进行shuffle。使用shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。
keyBy 后的窗口聚合操作存在数据倾斜
因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
1)实现思路:
- 第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚合
注意:聚合完不再是WindowedStream,要获取WindowEnd(窗口结束时间)作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起),开窗中打散的数据仍会处于原来的窗口当中,这时候才进行聚合,这时候相同窗口中的数据就可以看成一条普通的流,数据量才会减少和不发生错误。
- 第二阶段聚合:按照原来的key及windowEnd作keyby、聚合
2)提交原始案例
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo2 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
–two-phase false
查看WebUI:
3)提交两阶段聚合的案例
public class SkewDemo2 {
public static void main(String[] args) throws Exception {
// Configuration conf = new Configuration();
// conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// env.setParallelism(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.setStateBackend(new HashMapStateBackend());
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(3), CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(“hdfs://hadoop1:8020/flink-tuning/ck”);
// checkpointConfig.setCheckpointStorage(“file:///F:/flink-tuning/test/ck”);
checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(3));
checkpointConfig.setTolerableCheckpointFailureNumber(5);
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SingleOutputStreamOperator<JSONObject> jsonobjDS = env
.addSource(new MockSourceFunction())
.map(data -> JSONObject.parseObject(data));
// 过滤出 页面数据,转换成 (mid,1L)
SingleOutputStreamOperator<Tuple2<String, Long>> pageMidTuple = jsonobjDS
.filter(data -> StringUtils.isEmpty(data.getString(“start”)))
.map(r -> Tuple2.of(r.getJSONObject(“common”).getString(“mid”), 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 按照mid分组,统计每10s,各mid出现的次数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
boolean isTwoPhase = parameterTool.getBoolean(“two-phase”, true);
int randomNum = parameterTool.getInt(“random-num”, 5);
if (!isTwoPhase) {
pageMidTuple
.keyBy(r -> r.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
.print().setParallelism(1);
} else {
// 拼接随机数打散,第一次聚合(窗口聚合)
SingleOutputStreamOperator<Tuple3<String, Long, Long>> firstAgg = pageMidTuple
.map(new MapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
Random random = new Random();
@Override
public Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {
return Tuple2.of(value.f0 + “-” + random.nextInt(randomNum), 1L);
}
}) // mid拼接随机数
.keyBy(r -> r.f0) // 第一次按照 “mid|随机数” 分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(
(value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1),
new ProcessWindowFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple3<String, Long, Long>> out) throws Exception {
Tuple2<String, Long> midAndCount = elements.iterator().next();
long windowEndTs = context.window().getEnd();//获取窗口结束时间
out.collect(Tuple3.of(midAndCount.f0, midAndCount.f1, windowEndTs));//key,聚合结果,窗口结束时间戳
}
}
);// 窗口聚合(第一次聚合),加上窗口结束时间的标记,方便第二次聚合汇总
// 按照原来的 key和windowEnd分组,第二次聚合
firstAgg
.map(new MapFunction<Tuple3<String,Long,Long>, Tuple3<String,Long,Long>>() {
@Override
public Tuple3<String, Long, Long> map(Tuple3<String, Long, Long> value) throws Exception {
String originKey = value.f0.split(“-“)[0];
return Tuple3.of(originKey,value.f1 ,value.f2);
}//原来的key,统计值,窗口结束时间戳
}) // 去掉 拼接的随机数
.keyBy(new KeySelector<Tuple3<String, Long, Long>, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> getKey(Tuple3<String, Long, Long> value) throws Exception {
return Tuple2.of(value.f0, value.f2);
}//原来的key,窗口结束时间戳
}) // 按照 原来的 key和 窗口结束时间 分组
.reduce((value1, value2) -> Tuple3.of(value1.f0, value1.f1 + value2.f1, value1.f2)) // 第二次真正聚合
.print().setParallelism(1);
}
env.execute();
}
}
提交代码:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo2 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
–two-phase true \
–random-num 16//随机打散成16份
查看WebUI:可以看到第一次打散的窗口聚合,比较均匀
第二次聚合,也比较均匀:
随机数范围,需要自己去测,因为keyby的分区器是(两次hash*下游并行度/最大并行度)
SQL写法参考:https://zhuanlan.zhihu.com/p/197299746
Job优化
使用DataGen造数据
开发完Flink作业,压测的方式很简单,先在kafka中积压数据,之后开启Flink任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。
数据可以是自己造的模拟数据,也可以是生产中的部分数据。造测试数据的工具:DataFactory、datafaker 、DBMonster、Data-Processer 、Nexmark、Jmeter等。
Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
DataStream的DataGenerator
import com.atguigu.flink.tuning.bean.OrderInfo;
import com.atguigu.flink.tuning.bean.UserInfo;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
public class DataStreamDataGenDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.disableOperatorChaining();
//随机数:
SingleOutputStreamOperator<OrderInfo> orderInfoDS = env
.addSource(new DataGeneratorSource<>(new RandomGenerator<OrderInfo>() {
@Override
public OrderInfo next() {
return new OrderInfo(
random.nextInt(1, 100000),
random.nextLong(1, 1000000),
random.nextUniform(1, 1000),
System.currentTimeMillis());
}
}))
.returns(Types.POJO(OrderInfo.class));
//自增序列:
SingleOutputStreamOperator<UserInfo> userInfoDS = env
.addSource(new DataGeneratorSource<UserInfo>(
new SequenceGenerator<UserInfo>(1, 1000000) {
RandomDataGenerator random = new RandomDataGenerator();
@Override
public UserInfo next() {
return new UserInfo(
valuesToEmit.peek().intValue(),//队列中的数据读取后从队列中不删除
valuesToEmit.poll().longValue(),//队列中的数据读取后从队列中删除
random.nextInt(1, 100),
random.nextInt(0, 1));
}
}
))
.returns(Types.POJO(UserInfo.class));
orderInfoDS.print(“order>>”);
userInfoDS.print(“user>>”);
env.execute();
}
}
SQL的DataGenerator
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SQLDataGenDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.disableOperatorChaining();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String orderSql=”CREATE TABLE order_info (\n” +
” id INT,\n” +
” user_id BIGINT,\n” +
” total_amount DOUBLE,\n” +
” create_time AS localtimestamp,\n” +
” WATERMARK FOR create_time AS create_time\n” +
“) WITH (\n” +
” ‘connector’ = ‘datagen’,\n” +
” ‘rows-per-second’=’20000’,\n” +//生成速率:每秒20000行数
” ‘fields.id.kind’=’sequence’,\n” +//字段id的类型(自增序列)
” ‘fields.id.start’=’1’,\n” +//字段id范围初始值1
” ‘fields.id.end’=’100000000’,\n” +//字段id范围结束值100000000
” ‘fields.user_id.kind’=’random’,\n” +/字段user_id的类型为随机序列
” ‘fields.user_id.min’=’1’,\n” +
” ‘fields.user_id.max’=’1000000’,\n” +
” ‘fields.total_amount.kind’=’random’,\n” +
” ‘fields.total_amount.min’=’1’,\n” +
” ‘fields.total_amount.max’=’1000’\n” +
“)”;
String userSql=”CREATE TABLE user_info (\n” +
” id INT,\n” +
” user_id BIGINT,\n” +
” age INT,\n” +
” sex INT\n” +
“) WITH (\n” +
” ‘connector’ = ‘datagen’,\n” +
” ‘rows-per-second’=’20000’,\n” +
” ‘fields.id.kind’=’sequence’,\n” +
” ‘fields.id.start’=’1’,\n” +
” ‘fields.id.end’=’100000000’,\n” +
” ‘fields.user_id.kind’=’sequence’,\n” +
” ‘fields.user_id.start’=’1’,\n” +
” ‘fields.user_id.end’=’1000000’,\n” +
” ‘fields.age.kind’=’random’,\n” +
” ‘fields.age.min’=’1’,\n” +
” ‘fields.age.max’=’100’,\n” +
” ‘fields.sex.kind’=’random’,\n” +
” ‘fields.sex.min’=’0’,\n” +
” ‘fields.sex.max’=’1’\n” +
“)”;
tableEnv.executeSql(orderSql);
tableEnv.executeSql(userSql);
tableEnv.executeSql(“select * from order_info”).print();
// tableEnv.executeSql(“select * from user_info”).print();
}
}
算子指定UUID
对于有状态的 Flink 应用,推荐给每个算子都指定唯一用户ID(UUID)。 严格地说,仅需要给有状态的算子设置就足够了。但是因为 Flink 的某些内置算子(如 window)是有状态的,而有些是无状态的,可能用户不是很清楚哪些内置算子是有状态的,哪些不是。所以从实践经验上来说,我们建议每个算子都指定上 UUID。
默认情况下,算子UID是根据JobGraph自动生成的,JobGraph的更改可能会导致UUID改变。手动指定算子 UUID ,可以让 Flink 有效地将算子的状态从 savepoint 映射到作业修改后(拓扑图可能也有改变)的正确的算子上。比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。这是 savepoint 在 Flink 应用中正常工作的一个基本要素。
Flink 算子的 UUID 可以通过 uid(String uid) 方法指定,通常也建议指定name。
#算子.uid(“指定uid”)
.reduce((value1, value2) -> Tuple3.of(“uv”, value2.f1, value1.f2 + value2.f2))
.uid(“uv-reduce”).name(“uv-reduce”)
1)提交案例:未指定uid
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
触发保存点:
//直接触发
flink savepoint <jobId> [targetDirectory] [-yid yarnAppId] #on yarn模式需要指定-yid参数
//cancel触发
flink cancel -s [targetDirectory] <jobId> [-yid yarnAppId] #on yarn模式需要指定-yid参数
例如:
bin/flink cancel -s hdfs://hadoop1:8020/flink-tuning/sp 98acff568e8f0827a67ff37648a29d7f -yid application_1640503677810_0017
修改代码,从savepoint恢复(-s):
bin/flink run \
-t yarn-per-job \
-s hdfs://hadoop1:8020/flink-tuning/sp/savepoint-066c90-6edf948686f6 \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
报错如下:
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://hadoop1:8020/flink-tuning/sp/savepo
int-066c90-6edf948686f6. Cannot map checkpoint/savepoint state for operator ddb598ad156ed281023ba4eebbe487e3 to the new program,
because the operator is not available in the new program. If you want to allow to skip this, you can set the –allowNonRestoredState option on the CLI.
临时处理:在提交命令中添加–allowNonRestoredState (short: -n)跳过无法恢复的算子。
2)提交案例:指定uid
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UidDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
触发保存点:
//cancel触发savepoint
bin/flink cancel -s hdfs://hadoop1:8020/flink-tuning/sp 272e5d3321c5c1481cc327f6abe8cf9c -yid application_1640268344567_0033
修改代码,从保存点恢复(替换保存点路径):
bin/flink run \
-t yarn-per-job \
-s hdfs://hadoop1:8020/flink-tuning/sp/savepoint-272e5d-d0c1097d23e0 \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UidDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
链路延迟测量
对于实时的流式处理系统来说,我们需要关注数据输入、计算和输出的及时性,所以处理延迟是一个比较重要的监控指标,特别是在数据量大或者软硬件条件不佳的环境下。Flink提供了开箱即用的LatencyMarker机制来测量链路延迟。开启如下参数:
metrics.latency.interval: 30000 #默认0,表示禁用,单位毫秒
监控的粒度,分为以下3档:
- single:每个算子单独统计延迟;
- operator(默认值):每个下游算子都统计自己与Source算子之间的延迟;
- subtask:每个下游算子的sub-task都统计自己与Source算子的sub-task之间的延迟。
metrics.latency.granularity: operator #默认operator
一般情况下采用默认的operator粒度即可,这样在Sink端观察到的latency metric就是我们最想要的全链路(端到端)延迟。subtask粒度太细,会增大所有并行度的负担,不建议使用。
LatencyMarker不会参与到数据流的用户逻辑中的,而是直接被各算子转发并统计。为了让它尽量精确,有两点特别需要注意:
- 保证Flink集群内所有节点的时区、时间是同步的:ProcessingTimeService产生时间戳最终是靠System.currentTimeMillis()方法,可以用ntp等工具来配置。
- metrics.latency.interval的时间间隔宜大不宜小:一般配置成30000(30秒)左右。一是因为延迟监控的频率可以不用太频繁,二是因为LatencyMarker的处理也要消耗一定性能。
提交案例:
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dmetrics.latency.interval=30000 \
-c com.atguigu.flink.tuning.UidDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
可以通过下面的metric查看结果:
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency
端到端延迟的tag只有murmur hash过的算子ID(用uid()方法设定的),并没有算子名称,(https://issues.apache.org/jira/browse/FLINK-8592)并且官方暂时不打算解决这个问题,所以我们要么用最大值来表示,要么将作业中Sink算子的ID统一化。比如使用了Prometheus和Grafana来监控,效果如下:
开启对象重用
当调用了enableObjectReuse方法后,Flink会把中间深拷贝的步骤都省略掉,SourceFunction产生的数据直接作为MapFunction的输入,可以减少gc压力。但需要特别注意的是,这个方法不能随便调用,必须要确保下游Function只有一种,或者下游的Function均不会改变对象内部的值。否则可能会有线程安全的问题。
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dpipeline.object-reuse=true \
-Dmetrics.latency.interval=30000 \
-c com.atguigu.flink.tuning.UidDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
细粒度滑动窗口优化
1)细粒度滑动的影响
当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降。
我们经常会碰到这种需求:以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:
- 状态
对于一个元素,会将其写入对应的(key, window)二元组所圈定的windowState状态中。如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。
- 定时器
每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。
2)解决思路
DataStreamAPI中,自己解决(https://issues.apache.org/jira/browse/FLINK-7001)。
我们一般使用滚动窗口+在线存储+读时聚合的思路作为解决方案:
(1)从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度);
(2)每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase);
(3)扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。
3)细粒度的滑动窗口案例
提交案例:统计最近1小时的uv,1秒更新一次(滑动窗口)
public class SlideWindowDemo {
public static void main(String[] args) throws Exception {
// Configuration conf = new Configuration();
// conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 0));
// env.setParallelism(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.setStateBackend(new HashMapStateBackend());
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(3), CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(“hdfs://hadoop1:8020/flink-tuning/ck”);
// checkpointConfig.setCheckpointStorage(“file:///F:/flink-tuning/test/ck”);
checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(3));
checkpointConfig.setTolerableCheckpointFailureNumber(5);
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SingleOutputStreamOperator<JSONObject> jsonobjDS = env
.addSource(new MockSourceFunction())
.map(data -> JSONObject.parseObject(data));
// 按照mid分组,新老用户修正
SingleOutputStreamOperator<JSONObject> jsonWithNewFlagDS = jsonobjDS
.keyBy(data -> data.getJSONObject(“common”).getString(“mid”))
.map(new NewMidRichMapFunc());
// 过滤出 页面数据
SingleOutputStreamOperator<JSONObject> pageObjDS = jsonWithNewFlagDS.filter(data -> StringUtils.isEmpty(data.getString(“start”)));
// 按照mid分组,过滤掉不是今天第一次访问的数据
SingleOutputStreamOperator<JSONObject> uvDS = pageObjDS
.keyBy(jsonObj -> jsonObj.getJSONObject(“common”).getString(“mid”))
.filter(new UvRichFilterFunction());
// 统计最近1小时的uv,1秒更新一次
SingleOutputStreamOperator<Long> uvOneDS = uvDS
.map(r -> 1L);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
boolean isSlidingSplit = parameterTool.getBoolean(“sliding-split”, false);
if (isSlidingSplit) {
uvOneDS
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.reduce(
(value1, value2) -> value1 + value2,
new SplitTumbleWindowPAWF()
)
.keyBy(r -> 1)
.process(new SplitWindowAggFunction()).setParallelism(1)
.print().setParallelism(1);
} else {
uvOneDS
.windowAll(SlidingProcessingTimeWindows.of(Time.hours(1), Time.seconds(1)))
.reduce(
(value1, value2) -> value1 + value2,
new SplitTumbleWindowPAWF())
.print().setParallelism(1);
}
env.execute();
SplitTumbleWindowPAWF类:
代码提交:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SlideWindowDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
–sliding-split false
可以看见,任务非常繁忙。
4)时间分片案例
提交案例:统计最近1小时的uv,1秒更新一次(滚动窗口+状态存储)
核心逻辑代码:
代码提交:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SlideWindowDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
–sliding-split true
Flink 1.13对SQL模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/33655.html