联邦学习作为新一代人工智能基础技术,通过解决数据隐私与数据孤岛问题,重塑金融、医疗、城市安防等领域。

腾讯 Angel PowerFL 联邦学习平台构建在 Angel 机器学习平台上,利用 Angel-­PS 支持万亿级模型训练的能力,将很多在 Worker 上的计算提升到 PS(参数服务器) 端;Angel PowerFL 为联邦学习算法提供了计算、加密、存储、状态同步等基本操作接口,通过流程调度模块协调参与方任务执行状态,而通信模块完成了任务训练过程中所有数据的传输。Angel PowerFL 联邦学习已经在腾讯金融云、腾讯广告联合建模等业务中开始落地,并取得初步的效果。

Angel 机器学习平台:github.com/Angel-ML

 

 

 

Angel PowerFL 对联邦通信服务的要求

Angel PowerFL 联邦学习平台在训练任务过程当中,对参与方之间的消息通信要求极高,要求消息系统必须稳定可靠、保持高性能且能保证数据安全。Angel PowerFL 的学习任务在训练过程当中,参与方之间会有大量的加密数据通过通信模块传输,Angel PowerFL 对通信服务有以下需求:

➡️ 稳定可靠

Angel PowerFL 的学习任务时长从几分钟到几小时,算法执行对数据的准确性要求很高,不同算法的数据传输峰值也不一样,这需要通信模块的服务足够稳定,并且不能丢数据。

➡️ 高性能传输

Angel PowerFL 底层通过 Spark 进行计算,Executor 并发执行会产生很多待传输的中间数据,通信模块需要将这些加密后的数据及时传输给对方,这就要求通信服务做到低延时、高吞吐量。

➡️ 数据安全

虽然 Angel PowerFL 所有数据都通过加密模块进行了加密,但参与联邦学习的业务可能分布在不同公司;跨公网进行传输,需要通信模块足够安全,不易被攻击。

为什么选择 Pulsar

联邦通信服务在做技术预研的时候,考虑过 RPC 直连、HDFS 同步、MQ 同步三种技术方案。考虑到对安全和性能的要求比较高,排除了 RPC 直连和 HDFS 同步方案,确定采用 MQ 同步方案。

MQ 可选的服务很多,比如 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考虑到 Angel PowerFL 对稳定性、可靠性、高性能传输和数据安全有很高的需求,我们咨询了腾讯数据平台部 MQ 团队,他们向我们推荐了 Pulsar。

随后,我们对 Pulsar 开展了深入调研,发现 Pulsar 内置的诸多特性,正好满足了我们对消息系统的要求。Pulsar broker 和 bookie 采用了计算存储分层架构,保证了数据稳定可靠,性能良好;Pulsar 支持跨地域复制(geo­-replication),解决了 PowerFL 跨联邦同步 MQ 问题;而 Pulsar 的验证和授权模式也能保证传输安全。

云原生的计算与存储分层架构

Apache Pulsar 是下一代云原生分布式消息和事件流平台,采用了计算和存储分层的架构:在 Broker 上进行 Pub/Sub 相关的计算,在 Apache BookKeeper 上存储数据。

和传统的消息平台(如 Kafka)相比,这种架构有明显的优势:

  • Broker 和 bookie 相互独立,可以独立扩展和容错,提升系统的可用性。
  • 分区存储不受单个节点存储容量的限制,数据分布更均匀。
  • BookKeeper 存储安全可靠,保证消息不丢失,同时支持批量刷盘以获得更高吞吐量。

 

 

Pulsar Geo­-replication

Pulsar 原生支持跨地域复制(Geo­-replication),可以在多个数据中心的多个 Pulsar 集群中同时同步/异步复制数据。还可以在消息级别,通过 setReplicationClusters 控制消息复制到哪些集群。

 

 

 

在上图中,无论 Producer P1、P2 和 P3 在什么时候分别将消息发布给 Cluster A、Cluster B 和 Cluster C 中的 topic T1,这些消息均会立刻复制到整个集群。一旦完成复制,Consumer C1 和 C2 即可从自己所在的集群消费这些消息。

水平扩展

由于 Pulsar 的存储设计基于分片,Pulsar 把主题分区划分为更小的块,称其为分片。每个分片都作为 Apache BookKeeper ledger 来存储,这样构成分区的分片集合分布在 Apache BookKeeper 集群中。这样设计方便我们管理容量和水平扩展,并且满足高吞吐量的需求。

  • 容量管理简单:主题分区的容量可以扩展至整个 BookKeeper 集群的容量,不受单个节点容量的限制。
  • 扩容简单:扩容无需重新平衡或复制数据。添加新存储节点时,新节点仅用于新分片或其副本,Pulsar 自动平衡分片分布和集群中的流量。
  • 高吞吐量:写入流量分布在存储层中,不会出现分区写入争用单个节点资源的情况。

经过深入调研后,我们决定在腾讯 Angel PowerFL 联邦学习平台上使用 Apache Pulsar。

基于 Apache Pulsar 的联邦通信方案

联邦学习的各个业务(Angel PowerFL 称之为 Party,每个 Party 有不同的 ID,如 10000/20000),可能分布在同个公司的不同部门(无网络隔离),也可能分布在不同公司(跨公网),各个 Party 之间通过 Pulsar 跨地域复制功能进行同步复制,总体设计方案如下:

 

 

 

联邦学习的每个训练任务,通过消息的 producer 和 consumer 连接所在 Party 的 Pulsar 集群,集群名以 fl-pulsar-[partyID] 进行区分,训练任务产生需要传输的中间数据后,生产者将这些数据发送给本地 Pulsar 集群。

Pulsar 集群收到数据后,通过 Pulsar proxy 建立的同步复制网络通道,将数据发送给使用方 Party。而使用方 Party 的消费者,会一直监听该训练任务对应的 topic,当有数据到达后,直接消费数据进行下一步的计算。

 

 

 

在 Angel PowerFL 执行训练任务时,driver 和每个 partition 会创建一个 channel 类型变量,该变量和 Pulsar 当中具体的 topic 一一对应,需要交换的数据都会经过生产者发送到这个 topic。

Angel PowerFL 支持多方联邦,因此会有 2+ 个 Pulsar 集群需要同步复制数据。每个联邦学习任务通过各自的 parties 任务参数指定了参与方,生产者在发送消息时调用setReplicationClusters接口,确保数据只在参与 Party 之间传输。

在 Angel PowerFL 的通信模块中,我们充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等功能。下面我来详细介绍如何在 Angel PowerFL 联邦学习平台中使用 Pulsar。

Geo­-replication 去掉Global ZooKeeper 依赖

在 Angel PowerFL 联邦学习平台上,部署一套完整的 Pulsar 依赖两个 ZooKeeper 集群,分别是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 作用类似,用来存储元数据。而 Global ZooKeeper 则在 Pulsar 多个集群间***享配置信息。

 

 

 

在 Angel PowerFL 场景中,每个 Party 加入前,都要先部署一个 Global ZooKeeper 的子节点,或者共用一套跨公司或跨地域的公共 ZooKeeper,这样不仅会增加部署的难度,也会增加被攻击的风险,不利于新 Party 加入。

Global ZooKeeper 中存储的元数据,主要是集群名/服务地址/namespace 权限等信息。Pulsar 支持创建和加入新集群。我们通过以下两个步骤注册联邦 Pulsar 集群的信息到 local ZooKeeper,就去除了对 Global ZooKeeper 的依赖:

步骤 1: 注册新加入 Party 的 Pulsar 集群

# OTHER_CLUSTER_NAME 为待注册 Party 的 Pulsar 集群名
# OTHER_CLUSTER_BROKER_URL为 Pulsar 集群对应的 broker 地址
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} 
 --url http://${OTHER_CLUSTER_HTTP_URL} 
 --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}


复制代码

步骤 2: 授予训练用到的 namespace 访问集群权限

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}


复制代码

对于新加入的 Party,只用提供与其对应的 Pulsar 的集群名/服务地址即可完成注册,geo-replication 就可以通过注册信息同步复制数据。

Client 增加 Token 认证

Pulsar 作为 Angel PowerFL 的通信模块,没有加入用户级别的权限控制。为了进一步保证 client 生产和消费数据的安全,我们参考 Pulsar Client authentication using tokens based on JSON Web Tokens 增加了 token 认证,Angel PowerFL 的训练任务除了配置当前 Party 使用的服务地址外,还需要配置 admin token。

pulsar.apache.org/doc...
由于 Angel PowerFL 整套系统部署在 Kubernetes 上,我们通过容器准备 Pulsar 集群需要的 Public/Private keys 等文件,然后注册到 K8S secret 中。

# 生成 fl-private.key 和 fl-public.key
docker run --rm -v "$(pwd)":/tmp 
 apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create-key-pair --output-private-key 
 /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# 生成 admin-token.txt token 文件
echo -n `docker run --rm -v 
 "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create --private-key 
 file:///tmp/fl-private.key --subject admin`
# 将认证相关的文件注册到 K8S
kubectl create secret generic token-symmetric-key 
 --from-file=TOKEN=admin-token.txt 
 --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}


复制代码

开启多集群 topic 自动回收

Pulsar 集群开启了 geo-­replication 功能后,无法通过命令直接删除用过的 topic,而 Angel PowerFL 训练任务每次使用的任务是一次性的,任务结束后这些 topic 就没用了,如果不及时删除会出现大量累积。

对于通过 geo­-replication 开启复制的 topic,可以配置brokerDeleteInactivetopicsEnabled参数,开启 topic 自动回收。自动回收无用的 topic,需满足以下几个条件:

  • 当前 topic 没有生产者( producer)或者消费者(consumer)连接
  • 当前 topic 没有被订阅
  • 当前 topic 没有需要保留的信息

Angel PowerFL 部署的 Pulsar 集群,通过 brokerDeleteInactivetopicsEnabled 开启 topic 自动回收。在执行训练任务的过程中,使用后对每个 topic 按回收条件进行处理。同时,我们增加了

brokerDeleteInactivetopicsFrequencySeconds 配置,将回收的频率设置为 3 小时。

优化 topic 限流

Angel PowerFL 中的训练任务,在不同的数据集/算法/执行阶段,生产数据的流量峰值也不同。目前生产环境中单个任务最大的数据量超过 200G/小时。训练过程中,如果 Pulsar 连接中断或者生产和消费过程出现异常,需要重新开始整个训练任务。

为了规避 Pulsar 集群被单个训练任务冲垮的风险,我们使用了 Pulsar 的限流功能。Pulsar 支持 message-rate 和 byte-rate 两种生产限流策略,前者限制每秒生产消息的数量,后者限制每秒生产消息的大小。Angel PowerFL 将数据切分成多个 4M 的消息,通过 message-­rate 限制生产消息的数量。在 Angel PowerFL 中,我们将 namespace 的消息限制为 30 条(小于<30*4=120M/s):

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

刚开始测试 message-rate 的限流功能时,出现了限不住的情况(限流设置失效)。腾讯数据平台部 MQ 团队负责 Pulsar 的同事帮忙一起排查,发现设置 topicPublisherThrottlingTickTimeMillis 参数后,限制不能生效。

因此我们想办法在 broker 端启用了精确的 topic 发布频率限制,优化了限流功能并贡献回社区,详情见 PR-7078: introduce precise topic publish rate limiting。
github.com/apache/pul.…

优化 topic unloading 配置

Pulsar 根据 broker 集群负载状况,可以将 topic 动态分配到 broker上。如果拥有该 topic 的broker 宕机,或者拥有该 topic 的 broker 负载过大,则该 topic 会立即重新分配给另一个 broker ;而重新分配的过程就是 topic 的 unloading,该操作意味着关闭 topic,释放所有者(owner)。

理论上,topic unloading 由负载均衡调整,客户端将经历极小的延迟抖动,通常耗时 10ms 左右。但 Angel PowerFL 初期在执行训练任务时,日志爆出大量因为 unloading topic 导致的连接异常。日志显示 topic unloading 在不断的重试,但都不成功:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s


复制代码

先来看 broker/namespace/bundle/topic 这四者的关系。Bundle 是 Pulsar namespace 的一个分片机制,namespace 被分片为 bundle 列表,每个 bundle 包含 namespace 的整个哈希范围的一部分。Topic 不直接分配给 broker,而是通过计算 topic 的哈希码将 topic 分配给特定的 bundle;每个 bundle 互相独立,再被分配到不同的 broker 上。

Angel PowerFL 早期的任务 topic 没有复用,一个 LR 算法训练任务创建了 2000 多个 topic,每个 topic 生产的数据负载也不同,我们判断上述断连问题是由于短时间内(最小任务十分钟内能结束,同时会有多个任务在运行)大量创建和使用 topic,导致负载不均衡,topic unloading 频繁发生。为了降低 topic unloading 的频率,我们调整了 Pulsar Bundle 的相关参数:

# 增加 broker 可最大分配 topic 数量
loadBalancerBrokerMaxTopics=500000
# 启用自动拆分namespace bundle
loadBalancerAutoBundleSplitEnabled=true
# 增加触发拆分 bundle 的 topic 数量
loadBalancerNamespaceBundleMaxTopics=10000
# 增加触发拆分 bundle 的消息数
loadBalancerNamespaceBundleMaxMsgRate=10000


复制代码

同时,在创建 namespace 时,把 bundle 数量默认设置为 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64


复制代码

经过以上调整,Angel PowerFL 在任务执行期间没有再出现过由于 topic unloading 导致的断连。

Pulsar on Kubernetes

Angel PowerFL 的所有服务均通过 Helm 部署在 Kubernetes 上。Pulsar 作为其中的一个 chart,可以很好的利用 K8S 的资源隔离、快速扩缩容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的实践中,我们总结了以下经验:

🎙️ 使用 Local Persistent Volume 作为存储

Pulsar 是 IO 敏感的服务,尤其 bookie 组件,在生产环境中建议使用 SSD 或独立的磁盘。Angel PowerFL 在跑一些大数据集任务时,Pulsar 经常出现 “No Bookies Available” 的异常。这期间磁盘的 IO 使用率很高。

我们通过 Local Persistent Volume 将 bookie 和 ZooKeeper 等其它组件挂载到单独的磁盘,减缓了磁盘 IO 竞争。我们也测试过将 Pulsar 的 PV 存储换成 Ceph 和 NFS,性能都没有直接使用 Local Persistent Volume 好。

🎙️ 使用 NodeSelector

Geo-replication 同步复制数据期间,broker 需要访问对方的 Pulsar proxy 容器。Angel PowerFL 将网关机单独打了标签,通过 NodeSelector 将 broker 安装在可访问外网的网关机上。

🎙️ 配置 useHostNameAsBookieID

Bookie 是有状态的组件,为了 bookie pod 重建后服务正常,需要配置 useHostNameAsBookieID,确保向 ZooKeeper 注册的 ID 是 pod 的 hostname。

未来计划

Angel PowerFL 目前使用 Pulsar 快一年了,稳定运行时间最长的集群已经超过半年,未来对Pulsar 的使用计划主要有两个。

👍 升级 Pulsar 到 2.6.x 版本

我们目前使用的是 Pulsar 2.5.2 版本,由于最近会使用 Pulsar Key_Shared 功能做 Angel-PS 的容灾恢复。2.6.0 版本刚好有增强 Key_Shared 订阅模式,所以我们预计未来一个月升级到 Pulsar 2.6.x。
github.com/apache/pul.…

👍 Pulsar on K8S 支持多磁盘挂载

Angel PowerFL 所有服务都运行在 Kubernetes 上(除了任务使用的 YARN 计算资源),Pulsar 作为其中的一个 chart 和其它服务一起部署,使用 Local Persistent Volume 作为存储。但目前 bookie 只支持挂载一块磁盘(目录),对于多磁盘的机器没有更充分的利用,我们计划增加该特性。

总结

我们介绍了在人工智能应用场景下,使用 Pulsar 作为 Angel PowerFL 通信模块的相关实践。在方案实现过程当中,我们充分使用了 Pulsar 诸多内置特性,并根据自身需求做了相关优化,如 geo-­replication 去掉 Global ZooKeeper 依赖,为 client 增加 token 认证,开启多集群 topic 自动回收,优化 topic 限流功能和 topic unloading 配置等。

Pulsar 作为下一代云原生分布式消息和流平台,有众多吸引人的功能,在直播与短视频、零售与电子商务、媒体、金融等行业有广泛应用,期待 Pulsar 在不同的应用场景下不断有新的案例落地。