什么是广播状态
Broadcast State是Flink支持的一种Operator State。使用Broadcast State,可以在Flink程序的一个Stream中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个Task中,使得这些数据记录能够为所有的Task所共享,比如一些用于配置的数据记录。这样,每个Task在处理其所对应的Stream中记录的时候,读取这些配置,来满足实际数据处理需要。
另外,在一定程度上,Broadcast State能够使得Flink Job在运行过程中与外部的其他系统解耦合。比如,通常Flink会使用YARN来管理计算资源,使用Broadcast State就可以不用直接连接MySQL数据库读取相关配置信息了,也无需对MySQL做额外的授权操作。因为在一些场景下,会使用Flink on YARN部署模式,将Flink Job运行的资源申请和释放交给YARN去管理,那么就存在Hadoop集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如MySQL等进行连接操作授权,如果忘记对MysQL访问授权,Flink Job被调度到新增的某个新增节点上连接并读取MySQL配置信息就会出错。
广播简单例子
连接ALERT_RULE广播,接收到字符串为A或B或C的猜输出,这里ALERT_RULE写死了strings集合,正常来说应该是一个数据源。
final static MapStateDescriptor<String, String> ALERT_RULE = new MapStateDescriptor<>( "alert_rule", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); List<String> strings = Arrays.asList("A", "B", "C"); env.socketTextStream("127.0.0.1", 9200) .connect(env.fromCollection(strings).broadcast(ALERT_RULE)) .process(new BroadcastProcessFunction<String, String, String>() { @Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(ALERT_RULE); if (broadcastState.contains(value)) { out.collect(value); } } @Override public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { BroadcastState<String, String> broadcastState = ctx.getBroadcastState(ALERT_RULE); broadcastState.put(value, value); } }) .print(); env.execute(); }