支付宝如何优化TensorFlow集群,解决两大痛点?

  导语:ElasticDL是一个基于TensorFlow2.x和Kubernetes的开源的分布式深度学习编程框架。2019年秋天的GoogleDeveloperDay活动中来自蚂蚁金服的ElasticDL团队展示了ElasticDL的第一个开源版本。本文更新这大半年来ElasticDL项目的进展,尤其是性能优化和业务落地。

  ElasticDL的首要设计意图是简化分布式编程。它允许用户只提供用TensorFlow2.0API描述的模型,而不需要用户写分布式训练过程代码。用户的模型定义只要能在本地调通,即可在分布式环境下用大规模数据训练模型,从而提升研发效率。

  同时,ElasticDL提供的弹性调度的能力在实践中可以让集群的利用高达90%。当集群资源不足时,一个训练作业里的进程减少;当其他作业结束释放资源后,进程数量随之增加。这样的做法比TensorFlowDistributionStrategy专注容错(进程减少的情况下作业不失败,但不会增加进程数量)更进一步。并且,因为ElasticDL作业容忍变化的worker数量,所以每个作业的启动都不必等待集群有足够的资源,而是可以见缝插针的尽早开始训练,从而缩短等待作业启动的时间,让研发人员可以尽快看到第一个迭代的结果,万一分布式训练有问题,也能尽早发现,从而进一步提升了研发效率。

  简化分布式深度学习编程

  为了从海量数据中学习规律,我们需要编写分布式深度学习程序来完成训练任务。这在工业场景中尤为常见。

  可分布式深度学习程序的编写很难——编程者既要了解深度学习,也要了解分布式系统开发。在一个分布式深度学习系统中,需要启动和监控若干个workers。因为既要拆分训练数据给workers,还要综合各个worker算出的gradients来更新模型,所以涉及通信(Communication)和同步(Synchronization)。此外,当worker数目很多时,作业在执行过程中有worker挂掉的概率也会变得很大。如果一个worker挂掉,则整个作业重启或者恢复到最近的checkpoint(FaultRecovery),那么重启之后可能又会有worker挂掉导致重启,于是作业不断陷入重启和恢复,永远也无法完成。这进一步要求编程者具备设计容错(FaultTolerance)系统的能力。其实不仅分布式深度学习,其他分布式机器学习程序、分布式离线和在线数据处理程序等各种分布式程序的写作,都对编程者有类似上述要求。

  一个常见的解决思路是为特定类型的作业提供分布式编程框架,让用户只需要完形填空一样补上业务逻辑,而分布式计算(包括通信、同步、和容错)都由框架的代码来完成。一个典型的例子是离线数据处理程序用MapReduce框架来写。不管是GoogleMapReduce还是HadoopMapReduce,用户基本都只需填写map和reduce两个函数的实现即可。类似的,在线数据流系统基于Storm和Flink来写,用户只需提供bolts和nuts这样的业务逻辑定义。

  在ElasticDL之前,蚂蚁金服的同事们使用过多种框架和类似框架的高层API。这些方案大都基于TensorFlow和Kubernetes。

  ●TensorFlowEstimator作为构建在TensorFlow之上的一层API,允许用户只需定义模型,而训练过程封装在一个函数调用里。利用Kubeflow提供的TFoperator,我们可以将该训练过程以分布式作业的方式启动在Kubernetes上。这个方案的局限是:它仅支持TensorFlow的graphmode,不支持eagerexecution;而eagerexecution可以大幅简化调试,尤其方便跟踪网络各层输出。

  ●KerasAPI支持TensorFlow2.x和eagerexecution。目前TensorFlow2.xKerasAPI还暂不支持ParameterServer分布式策略,对AllReduce分布式策略提供了实验性的支持。

  ●Horovod对用户代码有侵入性,用户除了必须熟悉TensorFlowAPI之外,还需学习HorovodAPI。

  以上三个方案的共同局限是,虽然具备一定的容错能力,不过不支持弹性调度。而且它们都依赖部署Kubernetesoperator,了解Kubernetes对AI专家来说颇有挑战。

  针对这些局限,我们设计和开发了ElasticDL分布式计算框架。用户定义可以用TensorFlow2.x的KerasAPI来定义模型。并且,分布式执行不要求Kubernetes集群有任何特殊配置,而是利用每个作业里的master进程来协调训练数据分配、通信、同步和容错——这也是ElasticDL除了容错,支持弹性调度的原因。

  基于ElasticDL框架的编程

  就像MapReduce框架中只需要用户完形填空两个函数:map和reduce,ElasticDL需要用户填写forward、loss、optimizer、feed函数。其中forward定义深度学习的前向计算过程(ForwardPass),ElasticDL会调用TensorFloweagerexecution的GradientTape机制来自动推导对应的后向计算过程(BackwardPass);loss函数返回模型训练时使用的损失函数;optimizer函数返回模型训练时使用的优化器;feed定制化训练数据到TensorFlow模型输入(tensors)的转换过程。

  所有这些函数的编程只需要了解TensorFlowAPI,不需要对分布式训练有任何背景知识。写完之后,用户可以在单机上用小数据做调试验证。如果通过,可以不做任何代码修改就提交到Kubernetes集群上做分布式的容错的大规模训练。

  不同于Kubeflow/TF-operator给每个集群部署一个KubernetesOperator的方式,ElasticDL为每个作业引入一个master进程。通过调用KubernetesAPI,master进程了解集群情况;同时,作为作业的一部分,master还了解深度学习作业的特点——包括利用Pythoninspection机制了解上述各个函数的特点,其中调用的API函数等。所以,master有非常充分的信息来做更优的调度。比如master可以请Kubernetes把两个worker启动在同一台物理机上,共用一个GPU——当一个worker读数据的时候,请另外一个worker来做计算,从而始终保持较高的GPU利用率。

  一个例子

  我们用一个MNIST手写数字识别的例子来说明。

  defforward():inputs=tf.keras.Input(shape=(28,28),name="image")x=tf.keras.layers.Reshape((28,28,1))(inputs)x=tf.keras.layers.Conv2D(32,kernel_size=(3,3),activation="relu")(x)x=tf.keras.layers.Conv2D(64,kernel_size=(3,3),activation="relu")(x)x=tf.keras.layers.BatchNormalization()(x)x=tf.keras.layers.MaxPooling2D(pool_size=(2,2))(x)x=tf.keras.layers.Dropout(0.25)(x)x=tf.keras.layers.Flatten()(x)outputs=tf.keras.layers.Dense(10)(x)returntf.keras.Model(inputs=inputs,outputs=outputs,name="mnist_model")

  除了模型定义之外,用户还需要指定feed,loss,optimizer函数。

  defloss(labels,predictions):labels=tf.reshape(labels,[-1])returntf.reduce_mean(input_tensor=tf.nn.sparse_softmax_cross_entropy_with_logits(logits=predictions,labels=labels))defoptimizer(lr=0.1):returntf.optimizers.SGD(lr)deffeed(dataset,mode,_):def_parse_data(record):ifmode==Mode.PREDICTION:feature_description={"image":tf.io.FixedLenFeature([28,28],tf.float32)}else:feature_description={"image":tf.io.FixedLenFeature([28,28],tf.float32),"label":tf.io.FixedLenFeature([1],tf.int64),}r=tf.io.parse_single_example(record,feature_description)features={"image":tf.math.divide(tf.cast(r["image"],tf.float32),255.0)}ifmode==Mode.PREDICTION:returnfeatureselse:returnfeatures,tf.cast(r["label"],tf.int32)dataset=dataset.map(_parse_data)ifmode==Mode.TRAINING:dataset=dataset.shuffle(buffer_size=1024)returndataset

  上述每个函数都很容易做单独测试(unittest)。而且,利用TensorFlow2.xeagerexecution,上述函数很容易log每一层的输出。基于个特点,ElasticDLworker在调用forward函数的时候,可以打印中间结果,便于调试和复现问题。

  ElasticDL的弹性训练过程

  给定上述模型定义,ElasticDL的master进程按照asynchronous或者synchronousSGD方法,协调workers来做训练。当使用asynchronousSGD方法时,master会启动一个高性能的parameterserver,供各个workers使用。当使用synchronousSGD时,ElasticDL使用和才云科技合作研发的一个Kubernetes-native的fault-tolerableAllReduce实现FTlib。

  1、Master负责动态数据划分

  弹性训练过程的一个容易被忽略的前提是动态数据划分(Dynamicdatapartitioning)。在用MPI写分布式程序的时候,因为作业中进程数量是恒定的,所以经常采用静态数据划分的做法——在训练之前把训练数据预先分成N个文件,对应作业中的N个worker进程。这个做法在弹性调度的时候就失效了——因为弹性调度时,作业中的进程数量是可变的。为此,需要实现动态数据划分。

  ElasticDL的动态数据划分是基于索引的。ElasticDL要求训练数据是一个或者多个RecordIO格式的文件,或者是MaxCompute数据库系统中的表(table)。这两种数据源都允许master进程在开始训练之前,在基本存储单元(block)间快速跳跃着扫描数据,把数据分成小段,称之为任务(task)。每个task包括的内容如下:

  1.文件名或者表名,

  2.第一条记录相对于文件(或者表)开始处的偏移(offset),

  3.这个task里的总记录数。

  扫描结果是很多tasks,master把这些tasks放进一个TODO队列里。这个队列不一定需要是master进程里的数据结构,可以是放在etcd里的——因为etcd是不死的,所以master即使被高优先级作业抢占了,这个信息也不会丢失;可以通过在资源富余时重启master进程来恢复作业状态。

  扫描和划分数据的同时,master开始请Kubernetes启动workers,总数不超过用户指定的数量N(最大并发度)。每当一个worker启动起来了,master会收到Kubernetes发来的通知;master在一个etcd数据结构里记录“活着”的workers。

  扫描和划分数据结束之后,master就依次从TODO队列里取出task,通过gRPC发给某一个活着的worker,同时master把这个task挪进DOING队列里。接收到task的worker负责打开文件(或者表),并且从指定的offset开始依次读取记录,并且更新本地模型。根据用户选择的asynchronous或者synchronous算法,workers会通过调用parameterserver或者AllReduce来协调更新全局模型。

  当一个worker处理完了接收到的task,它通过gRPC返回一个表示成功的标记;master就把这个task从DOING队列挪到DONE队列了。当所有task都从TODO挪进了DONE,则说明一个epoch完成了。

  如果一个worker失败了(比如被更高优先级作业抢占了),则master的gRPCcall会timeout;此时,master把对应的task从DOING队列挪回TODO队列了。下一次有worker完成task时,master会把这个task再发出去。这里有一个细节:有的task可能被某个worker使用了一部分,也因此影响到了模型更新;此时worker被抢占,那么这部分已经被处理的数据会因为task的下一次分发,被重复使用。不过这个并不影响机器学习训练要求数据统计一致性的假设。而且其他动态数据划分方法造成的数据复用情况可能更严重。

  2、Worker调用TensorFlowEagerExecution

  ElasticDLworker接收到的一个task通常包括多个minibatches。对于每个task,worker打开对应的文件或者表,随后做如下操作:

  1.读取一个mini-batch的训练数据。

  2.用本地模型(localmodel)作为参数调用用户定义的forward函数以计算cost。如果模型很大,则部分参数可能来自于parameterserver。

  3.给定cost,worker利用TensorFloweagerexecution的GradientTape机制,进行backward计算,得到梯度(gradient)。

  4.如果是synchronousSGD,此时worker调用AllReduce实现FTlib来同步gradients并且更新模型。如果是asynchronousSGD,worker不定时的向parameterserver上传gradients,也不定时地从parameterserver获取全局模型参数。

  3、高效训练的优化

  相对于2019年秋季ElasticDL在GoogleDeveloperDay上亮相时的状态,最近几个月ElasticDL项目针对性能优化做了很多工作。当时ElasticDL使用Redis作为parameterserver。现在有了自己的用Go语言写的parameterserver。相对于Redis,ElasticDLparameterserver可以做一些深度学习计算,从而减少worker和parameterserver之间通信的次数。

  这个变化和其他优化工作一起让同样的训练作业,总体训练时间下降了约13倍。最近一个基于DeepFM模型的试验展示,用两个parameterserver进程和四个workers进程来训练,10个epochs的总体时间从1350秒(ElasticDL的2019年9月版本)下降到106秒(2020年2月版本)。这些优化策略包括:

  ●在parameterserver上惰性初始化(lazyinitialize)embeddingvectors——在使用到vector的时候才初始化。

  ●把一个embeddingtable拆分到多个parameterserver进程里以均衡存储与通信负载。

  ●worker从PS请求embeddingvectors时,先滤除重复的embeddingID,只取回不同ID的vectors,从而减少通信量。

  ●worker向PS发送梯度时,先把相同ID的梯度进行合并(调用TensorFlow的embeddingvectorcombination函数),从而减少通信量。

  弹性调度提升集群利用率

  ElasticDL实现的弹性调度和刚性调度(GangScheduling)是对应的。刚性调度的简洁不求甚解的描述是:一个作业里的n个进程,运行时如果有一个进程挂了(比如被更高优先级的作业抢占了资源),则整个作业挂掉。等资源足够再启动所有的n个进程了,则可以重启(或者从最近的checkpoint恢复)。

  上文提到的几种分布式运行TensorFlow作业的方式都使用了Kubeflow项目提供的Kubernetesoperators,支持在Kubernetes上分布式地运行TensorFlow作业。因为TensorFlowruntime目前支持一定程度的容错,所以作业执行过程中,如果有一些workers挂了,剩下的可以继续。不过不支持因为日后资源富余,恢复workers数量。XGBoost、MXNet社区也习惯于复用Kubeflow的Kubernetesoperator。用MPI写的程序也可以用Kubeflow拉起。

  而弹性调度(ElasticScheduling)实现的是训练作业运行过程中,进程数量的变化不影响作业进行。具体的说,如果一个或者几个进程被高优先级的作业抢占,剩下的进程不受影响地继续进行。如果将来资源丰沛了,系统可以加几个进程,此时作业仍然不受影响地继续运行。

  上文简述了ElasticDL实现弹性调度的机制,包括动态数据分配以及由master来启动、监控、和管理workers,而不依赖Kubernetesoperator。本节展示三个benchmark试验,帮助大家直观地了解ElasticDL对集群利用率和研发效率的同时提升。

  实验一:多个AI训练作业并发

  考虑两个AI训练作业需要的资源总和略超过集群的情况:如果没有elasticscheduling,则两个作业顺序执行。第二个作业的发起人需要等很久——用户体验不好。并且任何时刻只有一个作业在运行——集群资源用不满。而如果有elasticscheduling,则两个作业并发执行,虽然后启动的作业拿不到期待的全部资源,但是也马上就开始执行了——用户体验好,而且因为作业并发集群被用满。

  我们做了一个实验来验证上述好处。这个实验可以在ASI集群和开源Kubernetes集群上复现。实验结果如下图。

  Figure1:overlapjobs

  上图对应的实验里,我们用gangscheduling的方式提交了两个训练作业,每个作业都需要13个CPU。而GoogleCloud上租用的实验集群总CPU数是24,不足同时运行两个作业,所以依次运行它们。可以看到第一个作业在395秒时结束。随后集群花了一点时间调度,然后开始运行第二个作业,直到795秒时结束。

  下图对应的实验里,我们用ElasticDL来执行同样的两个训练作业。第一个作业提交之后的30秒,我们提交了第二个作业。第二个作业马上就开始运行,用满了集群剩下的资源,而不需要等到第一个作业结束。在395秒时,第一个作业结束。随后,在580秒时,第二个作业也结束了。因为弹性调度,使得两个作业尽量同时运行,所以总结束时间比也上图要早。

  总结:

  ●用户等待作业启动时间几乎是0。这对于AI工作很重要,因为用户最关注的是第一个迭代尽快开始——如果第一个迭代失败了,很可能是用户程序的bug。另外,深度学习模型往往需要手动调优,学习率、optimizer、activation等配置如果不合理,往往在前几个迭代就能发现;因此第一个迭代能立刻开始,对模型调优的工作效率提高有很大帮助。

  ●集群利用率高。第二个实验(elasticscheduling)执行期间,有一段时间集群利用率是100%;其他时间也不低于第一个实验(gangscheduling)。

  ●作业完成更快。第二个试验里,两个作业用了约580秒;第一个实验里需要约795秒。

  实验二:AI作业和在线服务混布

  运行各种在线服务的生产集群,通常需要留出余量资源,以应付突然增长的用户请求量。我们希望利用这些“余量”来做AI训练,从而提升集群利用率。下面实验验证:通过用较低优先级运行ElasticDL训练作业,在用户请求增加的时候,Kubernetes自动扩容在线服务(NGINX);此时ElasticDL作业自动释放资源,配合在线服务的扩容。当流量高峰过去之后,Kubernetes自动缩容NGINX服务,此时,ElasticDL自动利用释放的资源。

  Figure2:autoreact

  图中紫色曲线是NGINX服务使用的CPU数量,随用户请求数量变化。绿色曲线是ElasticDL训练作业使用的CPU数量,随NGINX的资源需求自动变化。蓝色曲线是集群的总体资源利用率——保持在90%以上。

  实验三:训练时更改worker数量不影响收敛性

  有用户担心训练过程中worker的数量发生变化,会导致不收敛。实际情况下从未发生这类问题。用ElasticDL和用gangscheduling分别训练Wide&Deepmodel和xDeepFMmodel,收敛曲线如下:

  Figure3:wide-n-deeptrainingconverges

  Figure4:xdeepfmtrainingconverges

  可以看到,采用gangscheduling持续用4个或者8个workers,和用ElasticDL并且worker数量在4到8之间变化,得到的收敛曲线很难分辨。差别在自然误差范围之内。

  总结

  蚂蚁金服从事的金融行业涉及支付、微贷、和保险等业务。和搜索、广告、推荐不同,金融业务的流程要复杂得多——包括对用户信用的预判以及和其他金融机构的联动——每一个用户请求对应很多处理步骤;而搜索、广告、推荐业务里针对每个用户请求的AI处理步骤少得多。行业特点导致蚂蚁金服要训练的模型的类型繁多,呈现更长尾的特点。也对工具提升研发效率提出了高要求。ElasticDL正是针对这些特点设计的。

  同时,对集群的利用率提升是各行各业都关注的。在很多公司和行业,AI集群的利用率通常在30%以下。当通过全面实现弹性调度,把集群利用率提升到90%左右时,相当于空手套白狼地把集群规模扩大了为原来的三倍多。因此节省的硬件投资可能高达数千万甚至数亿元人民币。

  ElasticDL的设计和实现依托了TensorFlow2.x提供的高效率的模型描述API。也依赖了TensorFloweagerexecution提供的GradientTape机制——使得ElasticDL可以在不改变TensorFlowruntime的情况下,结合Kubernetes实现彻底的弹性调度(进程数可增也可减),从而实现了减少作业启动的等待时间,提升集群利用率,和提升研发效率的效果。

  目前ElasticDL在阿里系结合PAI平台在推广。PAI平台提供的拖拽式编程模式进一步降低了端到端机器学习流程的研发门槛。希望接下来ElasticDL团队可以有更多结合业务实践的分享。