什么是广播状态
Broadcast State是Flink支持的一种Operator State。使用Broadcast State,可以在Flink程序的一个Stream中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个Task中,使得这些数据记录能够为所有的Task所共享,比如一些用于配置的数据记录。这样,每个Task在处理其所对应的Stream中记录的时候,读取这些配置,来满足实际数据处理需要。
另外,在一定程度上,Broadcast State能够使得Flink Job在运行过程中与外部的其他系统解耦合。比如,通常Flink会使用YARN来管理计算资源,使用Broadcast State就可以不用直接连接MySQL数据库读取相关配置信息了,也无需对MySQL做额外的授权操作。因为在一些场景下,会使用Flink on YARN部署模式,将Flink Job运行的资源申请和释放交给YARN去管理,那么就存在Hadoop集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如MySQL等进行连接操作授权,如果忘记对MysQL访问授权,Flink Job被调度到新增的某个新增节点上连接并读取MySQL配置信息就会出错。
广播简单例子
连接ALERT_RULE广播,接收到字符串为A或B或C的猜输出,这里ALERT_RULE写死了strings集合,正常来说应该是一个数据源。
final static MapStateDescriptor<String, String> ALERT_RULE = new MapStateDescriptor<>(
"alert_rule",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
List<String> strings = Arrays.asList("A", "B", "C");
env.socketTextStream("127.0.0.1", 9200)
.connect(env.fromCollection(strings).broadcast(ALERT_RULE))
.process(new BroadcastProcessFunction<String, String, String>() {
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(ALERT_RULE);
if (broadcastState.contains(value)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(ALERT_RULE);
broadcastState.put(value, value);
}
})
.print();
env.execute();
}


京公网安备 11010502036488号