MoE指的是sparse mixture of experts,sparse表示推理的时候不是所有的参数都会被激活。通常情况下MoE被认为是一种scaling up模型的技术,使用同样的资源训练更大的模型,某些设定下其效果甚至可能达到与同样参数量稠密网络相当的水平(Deepseek MoE 2B,见论文)。
最近社区里有很多MoE的开源工作,xAI发布了300B的MoE模型,苹果发布了MoE的多模态模型。不禁让人想到一个问题,MoE会是AI的未来吗?这是一个很难回答的问题,从我个人的观点出发,在硬件水平不出现巨大飞跃的前提下,答案是肯定的(Quantum come to recue... i'm waiting)。一方面是因为我相信处在最前沿的模型规模还会呈现大幅的提升,需要有技术来弥补硬件水平和扩大后模型规模之间的差距,而MoE是这方面一项成熟同时具有进一步提升潜力的方法。另外一方面,从神经元活动的分布的角度来看,人脑某些区域也是稀疏的,在进化论的角度也可以被看成一种减少能量消耗的方法。再说一下为什么MoE不会是未来,首先在MoE架构理论中有很多的漏洞,比如训练中需要用辅助loss保持exert激活的均匀性,路由训练过程中会震荡。虽然这些问题都有对应的方法去解决,但这种缝缝补补的技术带来收益的同时也限制了收益的上限(MoE的scaling law中可以体现)。
但这篇博客并不是为了讲MoE技术本身,而是解析一下megatron是如何实现MoE的训练的,以及大规模的MoE模型如何进行并行,同时增加对megatron的了解。
MoE结构回顾
首先,看一下在最主流的transformer框架里,MoE的结构如下图所示:
大多数情况下,MoE层是应用在MLP中的,也就是每个expert代表了一个MLP层。MoE并没有引入新的层,除了一个Router Network用来计算token和expert之间的匹配分数。
看起来MoE模型实现上和稠密网络并没有太大的区别,从计算流程上来看确实是这样,下面介绍一个MoE layer最简单的实现(transformer中的MixtralSparseMoeBlock)的过程:
class MixtralSparseMoeBlock (nn.Module) : def __init__ (self, config) : super().__init__() self.hidden_dim = config.hidden_size self.ffn_dim = config.intermediate_size self.num_experts = config.num_local_experts self.top_k = config.num_experts_per_tok # gating self.gate = nn.Linear(self.hidden_dim, self.num_experts, bias=False ) self.experts = nn.ModuleList([MixtralBlockSparseTop2MLP(config) for _ in range(self.num_experts)]) def forward (self, hidden_states: torch.Tensor) -> torch.Tensor: """ """ # 计算routing score router_logits = self.gate(hidden_states) routing_weights = F.softmax(router_logits, dim=1 , dtype=torch.float) # 根据topk选出每个token对应的激活专家 routing_weights, selected_experts = torch.topk(routing_weights, self.top_k, dim=-1 ) routing_weights /= routing_weights.sum(dim=-1 , keepdim=True ) final_hidden_states = [] # 在每个expert上计算选中的token for expert_idx in range(self.num_experts): expert_layer = self.experts[expert_idx] current_state = select_tokens(hidden_states, selected_experts) if current_state.shape[0 ] == 0 : continue # 计算expert的输出 current_hidden_states = expert_layer(current_state) * routing_weights[top_x, idx, None ] final_hidden_states.append(current_hidden_states) # 重组得到最后输出 final_hidden_states = concat(final_hidden_states, selected_experts) return final_hidden_states, router_logits
注:这里省略了一些不必要的计算细节,只保留对过程理解有用的部分。比如:select_tokens,concat是为了简化而虚构的函数。
上面的实现方法将所有的expert都加载到显存中,并且在计算expert的时候使用串行的方式。显然因为每个token计算的时候并不会激活所有的expert,虽然在实际训练的过程中每次的输入里包含很多个token,也就是可以保证不会有完全空闲的expert。但是expert上的计算压力明显要低于网络中其余部分,从而造成资源浪费。另外,在实际推理部署的时候,expert往往是分布在不同的设备上的,这就涉及多机通讯的问题。并且,router往往还要考虑负载均衡的问题。这些问题都给实现MoE的过程中增加了困难,下面看megatron是如何解决这些问题的。
这里分为3大块去分析整个实现过程,首先是expert的并行方式,其次是Router的设计,最后是Dispatcher。
Expert Parallel
EP(Expert Parallel)是MoE特有的并行方式,其核心是将expert在多个DP模型副本之间共享,从而实现节约显存的目的。可以说没有EP,那么大规模的MoE模型训练是完全不可能实现的。在介绍EP的划分方式之前,我们先来看一下megatron中并行组的划分方式。
Megatron中常规通讯组划分
首先我们来说一下没有EP的时候通讯组是怎么划分的。megatron中有TP,PP,DP的3种并行方式,这3种并行方式对应的通讯量大小排序是:TP>DP>PP。而在GPU集群里,设备内部的通讯带宽远大于设备间的带宽。因此通讯组划分原则就是,尽量让TP和DP通讯发生在设备内部,而PP组进行跨设备通讯。
如果不太理解这个分配方式,可以直接看下面这个例子。我会在示意图里面标记出每个GPU对应的TP DP PP的组,以及上面存放了对应模型的那部分结构。PS:顺便提一下,megatron中的并行组划分代码在写的时候应该是没有考虑可读性,如果想更快的看懂这块代码,在看之前最好也画一个这样的图对照着。
假设,现在有2台8卡的A100机器,需要训练一个8层的transformer模型,并行设置TP=2,PP=4,DP=2。那么示意图如下:
55a024601cf9e85c9b6252a39794ed1
notions*说明
:
,下标表示模型的1~8层参数,上标表示TP的第一个参数切片,方括号里表示第一个模型副本。
事实上,PP组的第一个节点和最后一个节点还需要保存embedding和unembedding的矩阵,这里为了方便省略掉了。
MoE layer通讯组划分
通过上面这个例子我们知道了正常的Dense模型在并行情况下是如何切分的,那如果是MoE模型会是怎么样的呢?我们把上一个例子稍微改动一下,把Dense改为MoE模型,每一层transformer里面有8个expert。设置EP=2,意味着每2个模型副本间共享一个完整的MoE层。
我们以1:2层为例,看一下expert会如何进行保存,为了方便我们用一个表格说明。因为PP=4,所以前2层的参数会存放在第1到4个节点上面,因此我们只列出这4个节点,以及节点上面存放的模型参数和expert参数。
notions*说明
:
中,下标表示第1~4个expert对应的参数,上标表示TP的第1个切片。
那么在EP并行的情况下,设备之间的通讯需求分为下面2个部分:
token分发;
通过上面表格可以看到,节点1-4共享一套完整的expert参数。(1, 2), (2, 3)是2个TP组,意味着这2个节点上的数据是相同的。但因为sequence parallel的存在,组内会在sequence方向上进行拆分输入。(1, 2)和 (2, 3)这2个组之间分别组成DP组,所以输入数据是不同的。这也就导致了节点1-4每个节点上面的token都是不一样的,所以在进行路由前,需要把这几个节点上的token都gather起来,再进行全局的分配。
参数更新;
因为EP的存在,expert参数和其他参数的DP组是不一样的,因此,要把存放有相同expert参数的节点放到一个expert独有的DP组里面。
在megatron.core.parallel_state#initialize_model_parallel 中,上面2个组分别对应了变量_TENSOR_AND_EXPERT_PARALLEL_GROUP和_DATA_MODULO_EXPERT_PARALLEL_GROUP。
Router
路由是MoE中最重要的一环,决定了token与expert之间的对应关系。路由方式不光决定了模型的效果,同时也与负载均衡特性息息相关。按照路由的主体可以将路由方式分为3大类,分别是:token-based、expert-based、global assignment,大多数已有的路由方式都可以归纳到这个分类体系下。
megatron中支持了2种路由方式,分别是TopK和global assignment,下面我们分别介绍2种方法的实现。
global assignment
global assignment将token和expert之间的匹配当做一个全局最优的线性匹配问题,这样做的好处有:1. 在训练过程中,可以做到给每个expert分配相同的token,不需要进行负载均衡;2. 对于routing collapse问题有一定的抑制作用,因为会有token分配到次优的expert上面。
global assignment有很多种不同的解法,通常可能会想到的是Hungarian Algorithm,但是因为其并不能很好利用GPU的并行特点,下面介绍2种对于GPU计算友好的算法。
拍卖行算法
在global assignment第一次被提出的论文《BASE Layers: Simplifying Training of Large, Sparse Models》中,就使用了拍卖行算法作为问题的实现方式。这个算法通过模拟拍卖的过程计算全局最优,在开源框架fairseq中实现了该算法的源码,这里在源码的基础上加了一些必要的注解帮助理解算法的过程。
def balanced_assignment (scores, max_iterations=100 ) : # scores [8, 80] 8 experts, 80 jobs num_workers, num_jobs = scores.size() jobs_per_worker = num_jobs // num_workers value = scores.clone() # 每个job对每个worker的价值,刚开始出价是0,所以等于scores iterations = 0 cost = scores.new_zeros(1 , num_jobs) # 每个job上面的标价,初始为0 jobs_with_bids = zeros(num_workers).bool() # 每个worker绑定的job数 while not jobs_with_bids.all(): # top_values, top_index [8, 11] # value表示job对worker的竞标价值:job对worker的价值 - 商品的报价 # 商品的价值初始为0 top_values, top_index = topk(value, k=jobs_per_worker + 1 , dim=1 ) # worker进行加注 # 加注的量取决于当前job的竞标价值和次优价值之间的差异; # 显然这种规则可以避免过度的加注 bid_increments = top_values[:, :-1 ] - top_values[:, -1 :] + eps # 每次下注只下最高的jobs_per_worker个任务,也就是在最理想的情况下,可以一次中标全部 bids = scatter( zeros(num_workers, num_jobs), dim=1 , index=top_index[:, :-1 ], src=bid_increments ) if 0 < iterations < max_iterations: # If a worker won a job on the previous round, put in a minimal bid to retain # the job only if no other workers bid this round. bids[top_bidders, jobs_with_bids] = eps # Find the highest bidding worker per job # top_bids, top_bidders [1, 80] # 中标情况 top_bids, top_bidders = bids.max(dim=0 ) jobs_with_bids = top_bids > 0 top_bidders = top_bidders[jobs_with_bids] # Make popular items more expensive cost += top_bids # 更新job的标价 value = scores - cost # 更具新的价值,重新计算每个worker和job之间的价值 if iterations < max_iterations: # If a worker won a job, make sure it appears in its top-k on the next round # 如果竞标中了,把对应的value设置成无穷大,保证下一轮还会竞标 value[top_bidders, jobs_with_bids] = ∞ else : value[top_bidders, jobs_with_bids] = scores[top_bidders, jobs_with_bids] iterations += 1 return top_index[:,:-1 ].reshape(-1 )
如果对该方法感兴趣,在论文《Auction Algorithms for Network Flow Problems: A Tutorial Introductionl》中可以找到收敛到最优点的证明。
sinkhorn算法实现
相比于sinkhorn算法,它的一种特殊例子Wasserstein metric可能更出名一点,大名鼎鼎的WGAN中的W所代表的就是它。Wasserstein metric可以理解为两个不同分布之间的最短距离,同时也是Optimal Transport问题的最优解。
什么是Optimal Transport?我们可以举一个例子:假设你有10个仓库在不同的位置,然后你有5个顾客需要从你这里进货。每个仓库中的货物数量用向量
c表示,每个顾客需要的货物用向量
表示,c和r可以被看成2个分布。进货的成本可以被表示为一个矩阵
,同样任意一种进货的方式可以被表示为
。r和c之间的Optimal Transport任务可以看成找到整体成本最小的进货方式
,并且此时的进货成本可以被看做是Wasserstein metric。
在这个例子中,Optimal Transport的任务可以形式化写成下面这种方式:
sinkhorn算法在此基础上加入P的信息熵作为一个限制项,确保配货方式不会落入极端情况。对应到这面这个例子中,你可能并不想出现所有人都去一个仓库进货的情况。
该算法的求解方法如下:
given
:
,
,
and
initialize
:
repeat
scale the rows
such that the row sums match
scale the columns
such that the column sums match
until
convergence
回到MoE的router任务中,token和expert之间的最优匹配可以被看成是在token上的分布与expert上均匀分布之间的最优传输距离。因此可以用sinkhorn求解,但是我们并不关心传输距离,而是可以把行动矩阵
看做是一个加了均衡负载约束的喜好分布。
Megatron中使用的sinkhorn主要是为了得到全局最优分配矩阵,因此做了一些简化,与标准实现会有差异。
def sinkhorn (cost: torch.Tensor, tol: float = 0.0001 ) : """Sinkhorn based MoE routing function""" # 这里给的cost其实是logits,代表token和expert之间的匹配程度 cost = torch.exp(cost) # sinkhorn距离的最优解中的 $\alpha$,$\beta$ 分别是这里的d0和d1 d0 = torch.ones(cost.size(0 ), device=cost.device, dtype=cost.dtype) d1 = torch.ones(cost.size(1 ), device=cost.device, dtype=cost.dtype) eps = 0.00000001 error = 1e9 d1_old = d1 ## 原始分布和目标分布都是均匀分布,所以用1 / d0.size(0) 和 1 / d1.size(0) 表示 while error > tol: d0 = (1 / d0.size(0 )) * 1 / (torch.sum(d1 * cost, 1 ) + eps) d1 = (1 / d1.size(0 )) * 1 / (torch.sum(d0.unsqueeze(1 ) * cost, 0 ) + eps) error = torch.mean(torch.abs(d1_old - d1)) d1_old = d1 return d1 * cost * d0.unsqueeze(1 )
topK
topK的实现与transformers里MixtralSparseMoeBlock的实现类似,根据router输出的logits选出每个token对应的前k个expert,并用softmax计算出对应的prob,作为最终计算结果的调和参数。
代码对应如下:
image-20240319175203075
辅助loss的最佳实现
Router里常见的辅助loss有2种,分别是load-banlance loss和z-loss。前者是为了应对route collapse问题,就是让router的结果更加的均匀,不会出现集中在个别expert上的情况。后者是为了防止gating网络计算的logits过大导致Router收敛变慢的情况。这2个loss都是在gating网络计算的logits上面进行计算得到的,loss的计算方法也没有什么特殊的,只是介绍一下loss生效的方式。
常规的实现方法是将每一个gating网络上面计算的logits收集起来,在模型推理完后计算对应的loss,并加到言模型的交叉熵loss上面。transformers中的switch_transformers就是这样实现的,对应代码如下:
image-20240318103517524
这种方式的问题是如果开启了PP,每个stage都需要将对应的logits传递给下一个stage,当然这样做也没有太多问题。但是megatron里使用了一种更加简洁的方式,给人以耳目一新的感觉。
首先,megatron的实现方式不需要传递中间变量,而是将loss当做网络的一部分。这里我们从megatron中摘抄一段z-loss的实现代码。
def z_loss_func (logits, z_loss_coeff) : z_loss = torch.mean(torch.square(torch.logsumexp(logits, dim=-1 ))) * z_loss_coeff return z_lossclass Router (MegatronModule) : ... def apply_z_loss (self, logits) : if self.config.moe_z_loss_coeff is not None : z_loss = z_loss_func(logits, self.config.moe_z_loss_coeff) logits = MoEAuxLossAutoScaler.apply(logits, z_loss) return logits def forward (self,) : ... logits = self.apply_z_loss(logits) ... # l
是的,就是这么简单,z_loss_func函数接收logits并返回对应的loss,并且结果中包含了设定好的loss因子。loss并没有被返回并收集,而是直接作为网络的一个计算步骤,所以看起来MoEAuxLossAutoScaler是loss生效的关键。
下面是MoEAuxLossAutoScaler的代码片段:
class MoEAuxLossAutoScaler (torch.autograd.Function) : main_loss_backward_scale: torch.Tensor = torch.tensor(1.0 ) @staticmethod def forward (ctx, output: torch.Tensor, aux_loss: torch.Tensor) : ctx.save_for_backward(aux_loss) return output @staticmethod def backward (ctx, grad_output: torch.Tensor) : (aux_loss,) = ctx.saved_tensors aux_loss_backward_scale = MoEAuxLossAutoScaler.main_loss_backward_scale scaled_aux_loss_grad = torch.ones_like(aux_loss) * aux_loss_backward_scale return grad_output, scaled_aux_loss_grad @staticmethod def set_loss_scale (scale: torch.Tensor) : MoEAuxLossAutoScaler.main_loss_backward_scale = scale
看起来也很简单,下面分析一下。
MoEAuxLossAutoScaler
是一个
torch.autograd.Function
的类,也就是意味着pytorch可以根据自动微分功能计算对应的梯度。
forward
函数中接收logits和aux_loss 2个参数,所以
backward
函数必须返回2个梯度向量,分别对应2个输入。
backward
函数接收1个向量,因为只有logits参与了后续的计算。
backward
里做了2件事情,分别是将logits的参数透传给上一个运算,并给aux_loss向量返回一个全是1的梯度,从而使得aux_loss对应的梯度能够传递给前面的运算。
此时有一个问题,学习率是如何生效的呢?注意MoEAuxLossAutoScaler还有一个方法
set_loss_scale
,这个方法接收一个变量并赋值给静态变量main_loss_backward_scale,这个变量也会和backward中的梯度相乘,显然这个scale的作用就是将学习率传递给梯度。在
megatron/core/pipeline_parallel/schedules.py
中调用了这个函数,并将当前的学习率赋值给该静态变量。