被好几个团队的人追着要渣B来分析一下DeepEP的工作, 公司内外的团队都有...简单的一句话说, 非常棒的工作,很多细节都值得学习. 但是还有一些硬件上的缺陷, 在DeepSeek-V3的论文中提出的建议要结合在一起看就会更清楚了. 我们还是由浅入深来谈谈EP并行, 并进一步分析一下这份出色的工作. 顺便展开讨论一下ScaleUP和ScaleOut网络遇到的难题和新的需求, DeepSeek采用了IB的技术, 虽然在github上说理论上兼容RoCE, 但是还有很多细节的地方需要探讨.
本文目录如下
1. EP并行概念 2. SGlang EP并行实现 3. DeepEP 3.1 用于通信的Buffer 3.2 用于训练和Prefill的高吞吐Kernel 3.2 .1 Dispatch通信流程 3.2 .1 .1 Notify_Dispatch 3.2 .1 .2 Intranode::dispatch 3.2 .1 .3 Internode::dispatch 3.2 .1 .3 .1 kRDMASender 3.2 .1 .3 .2 kRDMASenderCoordinator 3.2 .1 .3 .3 kRDMAAndNVLForwarder 3.2 .1 .3 .4 kForwarderCoordinator 3.2 .1 .3 .5 kNVLReceivers 3.2 .2 Combine流程 3.2 .2 .1 Intranode_Combine 3.2 .2 .2 Internode_Combine 3.2 .2 .2 .1 kNVLSender 3.2 .2 .2 .2 kNVLAndRDMAForwarder 3.2 .2 .2 .3 kRDMAReceiver 3.2 .2 .2 .4 kCoordinator 3.2 用于Decoding的低延迟Kernel 3.2 .1 LowLatency Layout 3.2 .2 低延迟Dispatch 3.2 .2 .1 SEND PHASE 3.2 .2 .2 RECV PHASE 3.2 .3 低延迟Combine 3.2 .3 .1 SEND PHASE 3.2 .3 .2 RECV PHASE 3.3 其它细节 3.3 .1 文档外行为的PTX指令 3.3 .2 Memory Order 3.3 .3 nvshmem库的修改 4. RoCE上运行DeepEP的挑战 4.1 DeepSeek用到的IB网络技术 4.2 RoCE使用DeepEP的问题 4.2 .1 Multi-Rail and Rail-Only拓扑的问题 4.2 .2 incast 4.2 .3 RC兼容 4.2 .4 In Network Computing如何做? 4.3 真正的适合EP的RDMA over Ethernet 5. 关于DeepSeek-V3论文的建议 6. 关于MoE模型的演进
1. EP并行概念
DeepSeek MoE的原理和演进在以前的一篇文章详细分析过
《详细谈谈DeepSeek MoE相关的技术发展》
Expert Parallelism(EP并行)如下图所示:
在DeepSeek-R1推理过程中, 为什么要实现EP并行? 从系统的角度来看, 单个专家的参数数据量为44MB,具体计算如下所示:
dim = 7168 inter_dim = 2048 tokens = 256 e = Expert(dim, inter_dim) from ptflops import get_model_complexity_info input_tokens = ( 1 ,tokens,dim)
flops, params = get_model_complexity_info(e, input_tokens, as_strings= True ,print_per_layer_stat= True ) Expert( 44.05 M, 100.000 % Params, 11.28 GMac, 99.995 % MACs, (w1): Linear( 14.68 M, 33.329 % Params, 3.76 GMac, 33.328 % MACs, in_features= 7168 , out_features= 2048 , bias= True ) (w2): Linear( 14.69 M, 33.341 % Params, 3.76 GMac, 33.340 % MACs, in_features= 2048 , out_features= 7168 , bias= True ) (w3): Linear( 14.68 M, 33.329 % Params, 3.76 GMac, 33.328 % MACs, in_features= 7168 , out_features= 2048 , bias= True )
如果一次加载44MB的专家参数权重, 而仅处理几个token是很不划算的, 同时多个专家放置在一台机器上, 很容相互之间加载干扰, 并且完全占满显存带宽. 整个系统的瓶颈在显存带宽上.
那么一个很朴素的想法就是通过多台机器并行, 把专家放置在多个机器上, 一方面等效的增加了显存的带宽, 另一方面也增加了专家参数权重的data locality. 然后用更大的batch(论文为256,开源代码为128)使得一次权重加载能够处理更多的token, 吞吐率就提升了.
但是难题就是需要大量的all2all的跨机通信, 此时如果网络通信的问题不解决好, EP并行的瓶颈就会转移到网络上, 这就是DeepEP工作的重要性了. 在谈DeepEP之前, 我们先来看看开源的EP并行实现.
2. SGlang EP并行实现
在RDMA网络中, 大规模组网下AlltoAll的通信效率都是一个非常大的问题, 因此开源推理软件如Sglang通过allreduce的方式来实现, 规避AlltoAll, 因此在开源社区如何用上DeepEP的工作, 还需要进一步分析. 这里对SGlang EP并行做一个简单的介绍, 来看一下
\python\sglang\srt\layers\moe\ep_moe\layer.py
中的EPMoE类, Sglang默认tp_size和ep_size相同, 然后它会根据自己的tp_rank来初始化和加载本地专家.
class EPMoE (torch.nn.Module) : def __init__ () : self.num_experts = num_experts assert self.num_experts % self.tp_size == 0 ## 根据节点的Rank计算出EP并行时, 该节点需要放置的专家. self.num_experts_per_partition = self.num_experts // self.tp_size self.start_expert_id = self.tp_rank * self.num_experts_per_partition self.end_expert_id = self.start_expert_id + self.num_experts_per_partition - 1
然后在该类的Foward函数中有详细的处理过程
def forward (self, hidden_states: torch.Tensor, router_logits: torch.Tensor) : ## 初始化GroupGemm ## 选择专家, 并根据输出的topk_ids进行重排序, 便于单个专家权重加载的时候复用内存处理多个token topk_weights, topk_ids = select_experts() reorder_topk_ids, src2dst, seg_indptr = run_moe_ep_preproess( topk_ids, self.num_experts ) # 重排序数据的预处理过程 pre_reorder_triton_kernel[(hidden_states.shape[ 0 ],)]() # 基于当前rank获取需要处理的segment seg_indptr_cur_rank = seg_indptr[self.start_expert_id : self.end_expert_id + 2 ] weight_indices_cur_rank = torch.arange( 0 , self.num_experts_per_partition, device=hidden_states.device, dtype=torch.int64, ) # 第一次GroupGemm做个Expert的Gate和Up-projection矩阵运算 gateup_output = self.grouped_gemm_runner( a=gateup_input, b=self.w13_weight, c=gateup_output, ....) # Expert 的激活函数处理 if self.activation == "silu" : silu_and_mul_triton_kernel[(gateup_output.shape[ 0 ],)]() # Expert的 down projection GroupGEMM处理 down_output = self.grouped_gemm_runner( a=down_input, b=self.w2_weight, c=down_output,) # Post重排 post_reorder_triton_kernel[(hidden_states.size( 0 ),)]( down_output, output, src2dst,...)
您肯定很好奇, 实际的函数中并没有看到alltoall通信? 它在MLA阶段会执行DP并行, 有一次allgather. 然后EP并行时, 每个卡都将Token和本地专家进行运算, 等完成Post_reorder后, 其实同一个token需要经过Expert处理后的数据, 都已经分散在不同卡上, 此时进行一次allreduce即可.
这种宏观上的allgather+allreduce 替代alltoall的做法通信量相对于alltoall直接dispatch/combine大了很多, 但有时候还是会选择, 因为工业界一直对alltoall通信的效率解决的并不好, 特别是在各种RoCE环境中, 由于incast等问题带来的长尾延迟, 实际通信效率还没有直接做AG+AR好. 本文后面的章节会详细讨论这个话题.
当然刚注意到SGLang已经在开始整合DeepEP了~
但是SGlang在RoCE上还有很多网络的调整需要处理, 后面详细介绍.
3. DeepEP
DeepEP是DeepSeek开源周第二天发布的一个项目, 解决了EP并行时AlltoAll通信的效率, 在Prefill和Training阶段提供高吞吐的能力, 同时在decoding阶段提供低延迟的传输. 同时还支持一些灵活的控制overlap计算和通信.
通信库的核心就是在对Buffer的高效管理和使用, 并且在延迟和吞吐之间进行TradeOff. 下面我们分为三个小章节来介绍.
3.1 用于通信的Buffer
DeepEP对外的工作原理是, 先通过
get_buffer
函数申请一段buffer, 并且可以通过
Buffer.set_num_sms(sm_num)
来设置需要用到的SM.
from deep_ep import Buffer, EventOverlap _buffer: Optional[Buffer] = None def get_buffer (group: dist.ProcessGroup, hidden_bytes: int) -> Buffer: # 计算NVLink和RDMA需要多少Bytes num_nvl_bytes, num_rdma_bytes = 0 , 0 for config in (Buffer.get_dispatch_config(group.size()), Buffer.get_combine_config(group.size())): num_nvl_bytes = max(config.get_nvl_buffer_size_hint(hidden_bytes, group.size()), num_nvl_bytes) num_rdma_bytes = max(config.get_rdma_buffer_size_hint(hidden_bytes, group.size()), num_rdma_bytes) # 为Buffer分配内存 if _buffer is None or _buffer.group != group or _buffer.num_nvl_bytes < num_nvl_bytes or _buffer.num_rdma_bytes < num_rdma_bytes: _buffer = Buffer(group, num_nvl_bytes, num_rdma_bytes) return _buffer
在
\deep_ep\buffer.py
中定义了一些buffer的 config
# Intranode if num_ranks <= 8 : return Config(Buffer.num_sms, 6 , 256 , 6 , 128 ) # Internode config_map = { 16 : Config(Buffer.num_sms, 16 , 288 , 20 , 128 ), 24 : Config(Buffer.num_sms, 8 , 288 , 32 , 128 ), 32 : Config(Buffer.num_sms, 8 , 288 , 32 , 128 ), 64 : Config(Buffer.num_sms, 20 , 288 , 28 , 128 ), 128 : Config(Buffer.num_sms, 20 , 560 , 32 , 128 ), 144 : Config(Buffer.num_sms, 32 , 720 , 12 , 128 ), 160 : Config(Buffer.num_sms, 28 , 720 , 12 , 128 ), }
后面四个参数定义在
csrc\config.hpp
中定义
struct Config { int num_sms; int num_max_nvl_chunked_send_tokens; int num_max_nvl_chunked_recv_tokens; int num_max_rdma_chunked_send_tokens; int num_max_rdma_chunked_recv_tokens; }
这些config在
tests\
目录下的几个测试例中看到, 应该是专门做过performance tuning, 但是在H20/H100等其它实例上, 不同的RDMA网络配比和不同的NVLink带宽, 还需要一定的修改. 未来或许需要针对不同的设备定义单独的config_file.
Buffer类型在
csrc\kernel\buffer.cuh
中定义了三类, 通过这些结构体,可以灵活地管理 GPU 内存缓冲区,支持多种内存管理和访问模式
Buffer
管理单个缓冲区,支持基本的内存分配和访问
AsymBuffer
管理多个缓冲区,支持多个Rank
SymBuffer
Decouple模式分别设置
send_buffer
和
recv_buffer
, 非解偶模式调用
buffer
在
csrc\deep_ep.cpp
中定义了Buffer类, 构造函数
Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode)
初始化缓冲区然后定义了一系列方法
用于训练和Prefill的高吞吐Normal Kernel
[intranode/internode]_[dispatch/combine]
用于推理Decoding的低延迟Kernel
low_latency_[dispatch/combine]
基于topk_idx和num_experts产生dispatch layout的函数
get_dispatch_layout
定义了一个任务fifo队列和移动fifo slot的私有方法
move_fifo_slots()
Task Fifo Size如下所示:
// Task fifo memory int64_t fifo_bytes = sizeof ( int ) * NUM_MAX_FIFO_SLOTS; int64_t buffer_ptr_bytes = sizeof ( void *) * NUM_MAX_NVL_PEERS; int64_t task_ptr_bytes = sizeof ( int *) * NUM_MAX_NVL_PEERS;
在Host侧还有一些私有Counter用于记录MoE专家相关的信息
// Host-side MoE info volatile int * moe_recv_counter = nullptr ; int * moe_recv_counter_mapped = nullptr ; // Host-side expert-level MoE info volatile int * moe_recv_expert_counter = nullptr ; int * moe_recv_expert_counter_mapped = nullptr ; // Host-side RDMA-level MoE info volatile int * moe_recv_rdma_counter = nullptr ; int * moe_recv_rdma_counter_mapped = nullptr ;
3.2 用于训练和Prefill的高吞吐Kernel
训练和Prefill阶段batch相对较大对延迟不是很敏感, 因此主要是最大化的利用带宽, README中的测试数据显示基本上带宽已经打满
看了DeepEP的通信库, 更加能够理解MoE Group设计的方法, 从原始论文的3.2可以知道, 它在训练阶段采用EP=64的方式并行, 正好8台H800的机器, 然后MoE分为8个Group并根据GroupScore选择4台进行通信, 这也就是论文所述的TOken的分发限制在4个节点.
另一方面由于有算法上的专家负载均衡, 统计上每个Token选择专家的概率应该是相对均衡的. 因此从通信上来看, 对于单个GPU, 首先根据Group idx直接将数据发送到该Group idx对应的remote节点相同Rank的GPU, 然后再通过机内NVLink通信. 在DeepEP代码中通过
intranode(NVLink)
和
internode(RDMA)
来区分这两种通信模式.
在调用dispatch之间通过
get_dispatch_layout
获得需要发送的Token,哪些要通过RDMA,以及token是否需要传输到某个Rank
# Calculate layout before actual dispatch num_tokens_per_rank, num_tokens_per_rdma_rank, num_tokens_per_expert, is_token_in_rank, previous_event = \ _buffer.get_dispatch_layout(topk_idx, num_experts, previous_event=previous_event, async_finish= True , allocate_on_comm_stream=previous_event is not None )
其中topk_idx为MoEGating函数产生的
[num_tokens, num_topk]
数组, 通过dispatch_layout输出的结果为
num_tokens_per_rank:
[num_ranks]
的数组, 每个Rank需要发送的token数量
num_tokens_per_rdma_rank:
[num_rdma_ranks]
的数组, 在每个RDMA Rank上需要发送的token数量
num_tokens_per_expert:
[num_experts]
数组, 基于每个专家粒度的token数量统计.
is_token_in_rank:
[num_tokens, num_ranks]
二维数组, 数据类型为bool型, 标记token是否从某个rank发送.
3.2.1 Dispatch通信流程
dispatch具体的通信流程如下图所示:
详细代码在
csrc\deep_ep.cpp
中, 以
intranode_dispatch
为例
Buffer::intranode_dispatch( const torch::Tensor& x, //Tensor数据x const std ::optional<:tensor>& x_scales, //用于FP8缩放的值 const std ::optional<:tensor>& topk_idx, // MoE Gating产生的专家index const std ::optional<:tensor>& topk_weights, //MoE Gating产生的weight const std ::optional<:tensor>& num_tokens_per_rank, //每个Rank的token数 const torch::Tensor& is_token_in_rank, //标记token是否从某个rank发送 const std ::optional<:tensor>& num_tokens_per_expert, //基于专家的token数统计 int cached_num_recv_tokens, //需要缓存的token数 const std ::optional<:tensor>& cached_rank_prefix_matrix, //[num_ranks,num_ranks]矩阵 const std ::optional<:tensor>& cached_channel_prefix_matrix, //[num_ranks, num_channels]的矩阵 int expert_alignment, const Config& config, std ::optional& previous_event, bool async, bool allocate_on_comm_stream //是否需要异步执行和分配通信的cudastream ) {
当
cached_rank_prefix_matrix
参数有值时,
cached_mode=True
. 然后根据Config中定义的SM数量定义Channel, 一个channel使用两个block, 偶数block用于发送, 奇数用于接收, 因此Channel数为config.num_sms的一半, 然后会对输入参数进行一系列validation. 如果
allocate_on_comm_stream==True
则会创建comm_stream, 并等待compute stream或者参数中的previous_event完成
auto compute_stream = at::cuda::getCurrentCUDAStream(); if (allocate_on_comm_stream) { EP_HOST_ASSERT(previous_event.has_value() and async); at::cuda::setCurrentCUDAStream(comm_stream); } // Wait previous tasks to be finished if (previous_event.has_value()) { stream_wait(comm_stream, previous_event.value()); } else { stream_wait(comm_stream, compute_stream); }
3.2.1.1 Notify_Dispatch
然后通过
notify_dispatch
函数launch kernel执行
notify_dispatch
, 通过多个节点交换将发送信息同步,并统计需要接收的Token数目以及统计几个prefix_matrix
//初始化接收counter *moe_recv_counter = -1 , *moe_recv_rdma_counter = -1 ; for ( int i = 0 ; i < num_local_experts; ++ i) moe_recv_expert_counter[i] = -1 ; //调用notify_dispath internode::notify_dispatch( num_tokens_per_rank->data_ptr< int >(), //每个Rank需要发送的token数量 moe_recv_counter_mapped, num_ranks, num_tokens_per_rdma_rank->data_ptr< int >(), //dispatch_layout的结果,在每个RDMA Rank上需要发送的token数量 moe_recv_rdma_counter_mapped, num_tokens_per_expert->data_ptr< int >(), //dispatch_layout的结果,基于专家的token数统计 moe_recv_expert_counter_mapped, num_experts, is_token_in_rank.data_ptr< bool >(), //[num_tokens, num_ranks]二维数组, 数据类型为bool型, 标记token是否从某个rank发送. num_tokens, num_channels, //token数和Channel数 hidden_int4, num_scales, num_topk, expert_alignment, rdma_channel_prefix_matrix.data_ptr< int >(), //{num_rdma_ranks, num_channels} recv_rdma_rank_prefix_sum.data_ptr< int >(), //{num_rdma_ranks} gbl_channel_prefix_matrix.data_ptr< int >(), //{num_ranks, num_channels} recv_gbl_rank_prefix_sum.data_ptr< int >(), //{num_ranks} rdma_buffer_ptr, config.num_max_rdma_chunked_recv_tokens, buffer_ptrs_gpu, config.num_max_nvl_chunked_recv_tokens, task_fifo_ptrs_gpu, head, rank, comm_stream, config.get_rdma_buffer_size_hint(hidden_int4 * sizeof (int4), num_ranks), num_nvl_bytes, low_latency_mode);
当
sm_id == 0
时:首先全局同步和清理缓冲区,然后将每个rank和expert的token数量通过
nvshmem_int_put_nbi
发送到RDMA ranks. 然后将计算好的token数量发送到同一节点内的其他ranks,使用
buffer_ptrs
和
nvl_send_buffer
. 接下来计算各个rank和expert的token数量的前缀和,更新
recv_rdma_rank_prefix_sum
和
recv_gbl_rank_prefix_sum
. 最后更新全局计数器
moe_recv_counter_mapped
和
moe_recv_expert_counter_mapped
, 最终再次使用
nvshmem_barrier_with_same_gpu_idx
和
barrier_device
确保所有线程完成任务 当
sm_id != 0
时,计算与channel相关的元数据,使用 warp_reduce_sum 进行warp级别的归约操作,计算每个channel的token数量同时计算
rdma_channel_prefix_matrix
和
gbl_channel_prefix_matrix
的前缀和,确保后续任务的调度和分配
在CPU上会根据notify的结果分配新的Tensor
// Allocate new tensors auto recv_x = torch::empty({num_recv_tokens, hidden}, x.options()); auto recv_topk_idx = std ::optional<:tensor>(), recv_topk_weights = std ::optional<:tensor>(), recv_x_scales = std ::optional<:tensor>(); auto recv_src_meta = std ::optional<:tensor>(); auto recv_rdma_channel_prefix_matrix = std ::optional<:tensor>(); auto recv_gbl_channel_prefix_matrix = std ::optional<:tensor>(); auto send_rdma_head = std ::optional<:tensor>(); auto send_nvl_head = std ::optional<:tensor>(); if ( not cached_mode) { recv_src_meta = torch::empty({num_recv_tokens, internode::get_source_meta_bytes()}, dtype(torch::kByte).device(torch::kCUDA)); recv_rdma_channel_prefix_matrix = torch::empty({num_rdma_ranks, num_channels}, dtype(torch::kInt32).device(torch::kCUDA)); recv_gbl_channel_prefix_matrix = torch::empty({num_ranks, num_channels}, dtype(torch::kInt32).device(torch::kCUDA)); send_rdma_head = torch::empty({num_tokens, num_rdma_ranks}, dtype(torch::kInt32).device(torch::kCUDA)); send_nvl_head = torch::empty({num_rdma_recv_tokens, NUM_MAX_NVL_PEERS}, dtype(torch::kInt32).device(torch::kCUDA)); }
最后调用
dispatch
函数执行分发, dispatch函数在RDMA和NVLINK的实现分别在
csrc\kernels\internode.cu
和
csrc\kernels\intranode.cu
主要目的就是将token, topk_idx和weight放入缓冲区.
3.2.1.2 Intranode::dispatch
intranode::dispatch
在NVLINK上执行收发, 首先它会根据
const bool is_sender = sm_id % 2 == 0;
判断是sender还是reciver. 然后计算分配每个线程需要处理的channel和rank.
初始化发送参数,包括计算任务范围、队列起始和结束偏移量
分批发送数据,包括token、topk索引、权重等,并更新尾指针
从发送缓冲区复制数据到目标缓冲区,包括token、源索引、topk索引和权重等
3.2.1.3 Internode::dispatch
internode::dispatch
涉及RDMA上的操作, 相对更加复杂, 它分配了多种WarpRole
enum class WarpRole { kRDMASender, //负责将本地数据通过RDMA发送到远程节点 kRDMASenderCoordinator, //管理RDMA发送的进度与同步 kRDMAAndNVLForwarder, //将RDMA接收的数据转发到本地NVLink缓冲区 kForwarderCoordinator, //全局协调转发任务 kNVLReceivers //从NVLink缓冲区读取数据并写入最终接收数组 };
3.2.1.3.1 kRDMASender
首先通过
get_channel_task_range
获取任务范围, 得到起始和结束的token索引
token_start_idx
,
token_end_idx
. 然后通过将
rdma_send_channel
的tail指针设置为0清除share memory.并将发送的下一个token的idx指向
token_start_idx
.
然后将这个Channel需要发送的token数量以负数
-value-1
的方式发送给其它节点, 然后同步等待.
for ( int dst_rdma_rank = warp_id; dst_rdma_rank < kNumRDMARanks; dst_rdma_rank += kNumDispatchRDMASenderWarps) { // 根据不同的lane_id设置send_buffer的内容 nvshmemx_int_put_nbi_warp(rdma_channel_meta.recv_buffer(rdma_rank), rdma_channel_meta.send_buffer(dst_rdma_rank), NUM_MAX_NVL_PEERS * 2 + 2 , translate_dst_rdma_rank(dst_rdma_rank, nvl_rank)); } nvshmem_fence(); sync_rdma_sender_smem();
然后开始迭代处理Token, 将token拷贝到buffer中, 获取锁并更新尾指针. 然后复制数据到缓冲区, 并广播元数据和其他相关信息(如
x
、
x_scales
、
topk_idx
和
topk_weights
),复制的代码如下
// Copy `x` into symmetric send buffer auto st_broadcast = [=]( const int key, const int4& value) { #pragma unroll for ( int j = 0 ; j < num_topk_ranks; ++ j) st_na_global( reinterpret_cast (dst_send_buffers[j]) + key, value); }; UNROLLED_WARP_COPY( 5 , lane_id, hidden_int4, 0 , x + token_idx * hidden_int4, ld_nc_global, st_broadcast); # pragma unroll for ( int i = 0 ; i < num_topk_ranks; ++ i) dst_send_buffers[i] = reinterpret_cast (dst_send_buffers[i]) + hidden_int4; // Copy source metadata into symmetric send buffer if (lane_id < num_topk_ranks) st_na_global( reinterpret_cast (dst_send_buffers[lane_id]), src_meta); # pragma unroll
for ( int i = 0 ; i < num_topk_ranks; ++ i) dst_send_buffers[i] = reinterpret_cast (dst_send_buffers[i]) + 1 ; // Copy `x_scales` into symmetric send buffer # pragma unroll for ( int i = lane_id; i < num_scales; i += 32 ) { auto value = ld_nc_global(x_scales + token_idx * num_scales + i); # pragma unroll for ( int j = 0 ; j < num_topk_ranks; ++ j) st_na_global( reinterpret_cast < float *>(dst_send_buffers[j]) + i, value); } # pragma unroll for ( int i = 0 ; i < num_topk_ranks; ++ i) dst_send_buffers[i] = reinterpret_cast < float *>(dst_send_buffers[i]) + num_scales; // Copy `topk_idx` and `topk_weights` into symmetric send buffer # pragma unroll for ( int i = lane_id; i < num_topk * num_topk_ranks; i += 32 ) { auto rank_idx = i / num_topk, copy_idx = i % num_topk; auto idx_value = static_cast < int >(ld_nc_global(topk_idx + token_idx * num_topk + copy_idx)); auto weight_value = ld_nc_global(topk_weights + token_idx * num_topk + copy_idx); st_na_global( reinterpret_cast < int *>(dst_send_buffers[rank_idx]) + copy_idx, idx_value); st_na_global( reinterpret_cast < float *>(dst_send_buffers[rank_idx]) + num_topk + copy_idx, weight_value); }
需要注意的是这段代码很巧妙, 在store的时候采用了指令
st.global.L1::no_allocate
避免L1的allocation, 虽然注释说猜测L2也没有allocate, 但是我个人觉得这样一条指令直接写透到HBM应该是不行的, 应该是会在L2Cache上放置然后write back.
// `st.global.L1::no_allocate` will be translated into `ST.E.NA.[width]` in SASS, // which does not have cache allocation (obviously in L1, I guess not in L2 too) # ifndef DISABLE_AGGRESSIVE_PTX_INSTRS # define ST_NA_FUNC "st.global.L1::no_allocate" # else # define ST_NA_FUNC "st.global" # endif
最后完成一些收尾工作, 例如更新尾指针并释放锁等. 然后同步.
3.2.1.3.2 kRDMASenderCoordinator
这段代码实现了RDMA发送协调者的逻辑,主要功能包括:
计算每个RDMA rank需要发送的token数量
迭代所有RDMA rank,检查是否有未处理的token,并发送RDMA请求
3.2.1.3.3 kRDMAAndNVLForwarder
这段代码主要实现了RDMA消费者和NVL生产者的通信逻辑,具体包括以下几个模块:
计算目标NVL rank (dst_nvl_rank) 和 RDMA rank (dst_rank)
确定每个rank负责的专家范围 (dst_rank_expert_begin 和 dst_rank_expert_end)
2.使用循环不断检查RDMA通道的元数据是否准备好
通过
ld_volatile_global
读取rdma_channel_meta的接收buffer读取元数据
元数据包含四个值 (meta_0, meta_1, meta_2, meta_3),用于确定接收token的数量和其他信息,
如果元数据满足条件,则进行处理并通知NVL ranks
3.然后使用
__syncwarp()
同步所有线程,确保所有线程一致地执行后续操作
4.根据接收到的token数量调整发送NVL头部的位置
5.调用
sync_forwarder_smem()
确保共享内存已被清理,为后续操作做准备
6.转发RDMA缓冲区中的token到NVL缓冲区:
将RDMA缓冲区中的token复制到NVL缓冲区,并更新相关指针
3.2.1.3.4 kForwarderCoordinator
负责协调各个转发器warp,确保数据的同步和头尾指针的正确更新,使用共享内存进行通信和状态跟踪它内部有一个死循环, 在循环中找到最小的没有retired的Channel的头指针
min_head
, 如果所有值都是最大值,则判断所有的Channel都处于retired状态,并推出循环. 否则, 更新远端头指针并发送数据.最后有一个nanosleep允许其他warp有机会执行.
3.2.1.3.5 kNVLReceivers
首先从从barrier结果中获取
src_nvl_rank
,并计算总的偏移量
total_offset
, 然后通过循环检查每个lane, 并计算
num_tokens_to_recv
然后通过一个条件为
num_tokens_to_recv>0
的循环拷贝搬迁数据, 包括
data
,
source meta
,
fp8 scale
,
topk_idx
和
topk_weights
等.
最后dispatch函数返回recv的数据
return recv_x, recv_topk_idx, recv_topk_weights, num_recv_tokens_per_expert_list, handle, event
3.2.2 Combine流程
然后Combine阶段的代码也类似, 但是它可以复用dispatch_layout的信息, 在
deep_ep/buffer.py
中定义的combine函数如下:
def combine (self, x: torch.Tensor, handle: Tuple, topk_weights: Optional[torch.Tensor] = None, config: Optional[Config] = None, previous_event: Optional[EventOverlap] = None, async_finish: bool = False, allocate_on_comm_stream: bool = False) -> \ Tuple[torch.Tensor, Optional[torch.Tensor], EventOverlap]:
其中 x为
[num_tokens, hidden]
的BF16数组, topk_weights:
[num_tokens, num_topk]
的数组, 记录MoE Gate函数得到的权重信息.
handle
通信用的handler, 可以从dispatch函数获得, handle包含了如下信息
# intranode handle handle = (rank_prefix_matrix, channel_prefix_matrix, recv_channel_prefix_matrix, recv_src_idx, is_token_in_rank, send_head) # internode handle handle = (is_token_in_rank, rdma_channel_prefix_matrix, gbl_channel_prefix_matrix, recv_rdma_channel_prefix_matrix, recv_rdma_rank_prefix_sum, recv_gbl_channel_prefix_matrix, recv_gbl_rank_prefix_sum, recv_src_meta, send_rdma_head, send_nvl_head)
函数返回的结果包含
combined_x
,
combined_topk_weights
和
event
.
3.2.2.1 Intranode_Combine
同样也是根据SM数量分配接收和发送的SM, Channel数为SM数量的一半, 一个Channel分别由两个SM负责收发, 偶数为发送SM,奇数为接收SM, 通过
is_sender = sm_id % 2 == 0
判断, 因为在Combine阶段还有reduce的操作, 为了保证store的时候尽量高效, 使用了按照int4来切分workload的方式
constexpr int kDtypePerInt4 = sizeof (int4) / sizeof ( dtype_t ); int hidden_int4 = hidden * sizeof ( dtype_t
) / sizeof (int4); auto x_int4 = reinterpret_cast < const int4*>(x); auto recv_int4 = reinterpret_cast (recv_x);
发送SM的功能如下
1.计算每个线程所属的warp和rank, 并确定其在rank中的位置
num_send_warps
:发送线程的warp数量,
num_send_warps_per_rank
:每个Rank负责的warp数量,
num_threads_per_rank
:每个秩的线程数量,
send_thread_id
、
send_lane_id
、
send_rank_id
和
send_warp_id_in_rank
分别表示当前线程ID、线程在warp中的位置、线程所属的rank以及线程在rank内的warp ID.
2. 根据Rank ID计算缓冲区的指针, 用于后续数据访问
ptr
:指向当前秩的缓冲区起始地址,
channel_head_idx
、
channel_tail_idx
、
channel_x_buffers
、
channel_src_idx_buffers
和
channel_topk_weights_buffers
分别表示Channel的头部索引、尾部索引、数据缓冲区、源索引缓冲区和topk权重缓冲区.
3.根据prefix matrix计算每个线程需要处理的token范围
rank_offset
和
num_rank_tokens
:计算rank的偏移量和token数量,
channel_offset
和
num_channel_tokens
:计算channel的偏移量和token数量,
token_start_idx
和
token_end_idx
:确定当前线程要处理的token范围.
4. 基于
token_start_idx
和
token_end_idx
按照chunk迭代发送数据
首先获取一个empty slot, 然后使用
UNROLLED_WARP_COPY
宏复制数据到目标缓冲区, 再发送
source index
和
topk weights
到
channel_src_idx_buffers
和
channel_topk_weights_buffers
最后更新尾指针
channel_tail_idx
接收SM的功能如下
其中一个warp用于队列更新, 其他的用于reduce接收的数据. 首先初始化用于接收的共享变量
// Shared head, tail and retired flags for receiver warps __shared__ volatile int warp_channel_head_idx[num_recv_warps][kNumRanks]; //每个warp的channel头索引 __shared__ volatile int channel_tail_idx[kNumRanks]; //每个Rank的channel尾索引 __shared__ volatile bool warp_retired[num_recv_warps]; //每个接收 warp 是否已Retried的标志
对于threadid < 32的第一个warp, 负责更新队列头尾的idx, 并检查是否所有的接收warp已经处于retired状态, 如果都已经是retire状态退出循环. 否则根据其它warp的最小头索引
min_head
更新.
其他的warp则进行接收数据的reduce操作,
// Reduce data # pragma unroll for ( int i = recv_lane_id; i < hidden_int4; i += 32 ) { // Read buffers int4 recv_value_int4[kNumRanks]; # pragma unroll for ( int j = 0 ; j < num_topk_ranks; ++ j) recv_value_int4[j] = ld_nc_global(channel_x_buffers[topk_ranks[j]].buffer() + slot_indices[j] * hidden_int4 + i); // Reduce all-to-all results float values[kDtypePerInt4] = { 0 }; # pragma unroll for ( int j = 0 ; j < num_topk_ranks; ++ j) { auto recv_value_dtypes = reinterpret_cast < const dtype_t *>(&recv_value_int4[j]); # pragma unroll for ( int k = 0 ; k < kDtypePerInt4; ++ k) values[k] += static_cast < float >(recv_value_dtypes[k]); } // Cast back to `dtype_t` and write int4 out_int4; auto out_dtypes = reinterpret_cast < dtype_t *>(&out_int4); # pragma unroll for ( int j = 0 ; j < kDtypePerInt4; ++ j) out_dtypes[j] = static_cast < dtype_t >(values[j]); recv_int4[token_idx * hidden_int4 + i] = out_int4; }
3.2.2.2 Internode_Combine
internode::combine
涉及RDMA和NVLINK操作更加复杂一些, Reduce操作定义了一个独立的函数
combine_token
计算方式和前一节类似, 然后Internode_combine warp分为如下4种WarpRole
enum class WarpRole { kNVLSender, //从NVLINK上发送 kNVLAndRDMAForwarder, //NVLink到RDMA转发 kRDMAReceiver, //RDMA接收 kCoordinator //协调器 };
它将一个Channel 解耦到2个SM
const auto rdma_rank = rank / NUM_MAX_NVL_PEERS, nvl_rank = rank % NUM_MAX_NVL_PEERS; auto role_meta = [=]() -> std ::pairint > { auto warp_id = thread_id / 32 ; if ( not is_rdma_receiver_sm) { if (warp_id < NUM_MAX_NVL_PEERS) { auto shuffled_warp_id = warp_id; shuffled_warp_id = (shuffled_warp_id + channel_id) % NUM_MAX_NVL_PEERS; return {WarpRole::kNVLSender, shuffled_warp_id}; } else if (warp_id < NUM_MAX_NVL_PEERS + kNumForwarders) { auto shuffled_warp_id = warp_id - NUM_MAX_NVL_PEERS; shuffled_warp_id = (shuffled_warp_id + channel_id) % kNumForwarders; return {WarpRole::kNVLAndRDMAForwarder, shuffled_warp_id}; } else { return {WarpRole::kCoordinator, 0 }; } } else { if (warp_id < NUM_MAX_NVL_PEERS + kNumForwarders) { return {WarpRole::kRDMAReceiver, warp_id}; } else { return {WarpRole::kCoordinator, 0 }; } } }();
3.2.2.2.1 kNVLSender
这段代码实现了NVLink通信中的数据发送逻辑。主要功能包括:
每次发送时检查是否有空闲槽位,并同步发送数据到目标缓冲区
3.2.2.2.2 kNVLAndRDMAForwarder
主要是将NVLink上收到的数据从RDMA转发. 首先调整NVL Buffer的指针位置, 然后清除共享内存并同步. 然后在NVLink向RDMA转发之前, 会调用
combine_token
进行一次reduce操作. z最后再从RDMA上发送数据.
3.2.2.2.3 kRDMAReceiver
使用
get_channel_task_range
函数获取当前Channel的任务范围(
token_start_idx
和
token_end_idx
)在循环中迭代,从rdma_channel_data.recv_buffer(src_rdma_rank)加载数据, 然后执行reduce操作
combine_token
3.2.2.2.4 kCoordinator
同步共享内存状态, 并更新RDMA rank和NVL rank的min_head
3.2 用于Decoding的低延迟Kernel
Decoding阶段为了降低延迟只使用了RDMA进行点到点通信, 测试性能如下
这里的延迟应该是指的生产网络环境中按照128个token作为一个batch(论文是256), 8个routed expert, 然后采用FP8 dispatch和BF16进行combine的处理方式的整体延迟, 带宽基本上也能打满.
需要注意的是, 在Decoding阶段点到点通信高负载时由于路由冲突会带来拥塞, 需要打开IB的自适应路由的功能, 另一方面如果decode阶段的流量相对轻载时, Adaptive routing可能会导致一些额外的延迟, 此时可以采用静态路由的方式, 这些内容我们将在下一章节进行详细分析.
在Decoding阶段采用了IBGDA的原因是避免CPU参与, 使得CUDA Graph可以直接调度, 论文中提到的两个microbatch overlap也有了对应的实现建议
通过一个钩子函数(hook), RDMA网络流量在后台进行,不会占用任何 GPU SMs 的计算部分.Overlap的部分可以根据负载调整. 执行Decoding阶段EP并行时, 首先也要通过get_buffer函数获取缓冲区, 并利用
low_latency_dispatch
和
low_latency_combine
方法进行alltoall处理.
3.2.1 LowLatency Layout
在
csrc\config.hpp
中定义了一个
LowLatencyBuffer
结构, dispatch和combine都分离了发送和接收Buffer, 添加了接收count和rdma原子操作的token_counter.然后combine阶段有一个
recv_flag
buffer
struct LowLatencyBuffer { int num_clean_int = 0 ; void * dispatch_rdma_send_buffer = nullptr ; void * dispatch_rdma_recv_data_buffer = nullptr ; int * dispatch_rdma_recv_count_buffer = nullptr ; int * dispatch_rdma_atomic_token_counter = nullptr ; void * combine_rdma_send_buffer = nullptr ; void * combine_rdma_recv_data_buffer = nullptr ; int * combine_rdma_recv_flag_buffer = nullptr ; std ::pair< int *, int > clean_meta () { EP_HOST_ASSERT(dispatch_rdma_recv_count_buffer == combine_rdma_recv_flag_buffer); return {dispatch_rdma_recv_count_buffer, num_clean_int}; } };
LowLatencyLayout 采用了两个对称的缓冲区, 交替使用减少等待时间, 分为三组buffer:
send
,
recv
,
signaling
,
struct LowLatencyLayout { size_t total_bytes = 0 ; LowLatencyBuffer buffers[ 2 ]; template < typename out_ptr_t = void *, typename count_ptr_t = uint8_t *, typename in_ptr_t = void *> out_ptr_t advance( const in_ptr_t & ptr, size_t count) { return reinterpret_cast < out_ptr_t >( reinterpret_cast < count_ptr_t >(ptr) + count); } LowLatencyLayout( void * rdma_buffer, int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) { const int num_scales = hidden / 128 ; const int num_local_experts = num_experts / num_ranks; // Dispatch and combine layout: // - 2 symmetric odd/even send buffer // - 2 symmetric odd/even receive buffers // - 2 symmetric odd/even signaling buffers
然后传输消息的Size为:
// Message sizes EP_HOST_ASSERT(num_scales * sizeof ( float ) <= hidden); size_t num_bytes_per_dispatch_msg = hidden + num_scales * sizeof ( float ) + sizeof (int4); size_t num_bytes_per_combine_msg = sizeof (int4) + hidden * sizeof (nv_bfloat16);
send
,
recv
,
signaling
缓冲区的定义如下
// Send buffer size_t dispatch_send_buffer_bytes = num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; size_t combine_send_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg; size_t send_buffer_bytes = std ::max(dispatch_send_buffer_bytes, combine_send_buffer_bytes); EP_HOST_ASSERT(send_buffer_bytes % sizeof (int4) == 0 ); total_bytes += send_buffer_bytes * 2 ; // Symmetric receive buffers // TODO: optimize memory usages size_t dispatch_recv_data_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; size_t combine_recv_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg; size_t recv_buffer_bytes = std ::max(dispatch_recv_data_buffer_bytes, combine_recv_buffer_bytes); EP_HOST_ASSERT(recv_buffer_bytes % sizeof (int4) == 0 ); total_bytes += recv_buffer_bytes * 2 ; // Symmetric signaling buffers size_t dispatch_recv_count_buffer_bytes = num_experts * sizeof ( int ); size_t dispatch_recv_atomic_token_counter_bytes = num_local_experts * sizeof ( int );
size_t combine_recv_flag_buffer_bytes = dispatch_recv_count_buffer_bytes; size_t signaling_buffer_bytes = std ::max(dispatch_recv_count_buffer_bytes + dispatch_recv_atomic_token_counter_bytes, combine_recv_flag_buffer_bytes); total_bytes += signaling_buffer_bytes * 2 ;
3.2.2 低延迟Dispatch
调用的函数如下, 注释中有一个非常关键的信息
compatible with CUDA graph
这也是为什么使用IBGDA的原因, 传统的RDMA需要以消息粒度发送, 并使用WRITE_WITH_IMM消息,将立即数作为CQE传递给CPU, CPU再进一步launch kernel, 这样的方式打断了CUDA graph使得延迟增大. 另一个关键点是采用了
double-batch overlapping
的方式, 然后可以通过第三天开源的DeepGEMM进行MoE的专家矩阵计算.
def low_latency_dispatch (hidden_states: torch.Tensor, topk_idx: torch.Tensor, num_max_dispatch_tokens_per_rank: int, num_experts: int) : global _buffer # Do MoE dispatch, compatible with CUDA graph (but you may restore some buffer status once you replay) recv_hidden_states, recv_expert_count, handle, event, hook = \ _buffer.low_latency_dispatch(hidden_states, topk_idx, num_max_dispatch_tokens_per_rank, num_experts, async_finish= False , return_recv_hook= True ) # NOTES: the actual tensor will not be received only if you call `hook()`, # it is useful for double-batch overlapping, but **without any SM occupation** # If you don't want to overlap, please set `return_recv_hook=False` # Later, you can use our GEMM library to do the computation with this specific format return recv_hidden_states, recv_expert_count, handle, event, hook
num_max_dispatch_tokens_per_rank
为一次batch的
num_tokens=128
. 然后将调用
csrc\deep_ep.cpp
中的
Buffer::low_latency_dispatch
函数. 该函数首先分配
LowLatencyLayout
, 然后等待前面的task完成后, 分配packed tensor
// Allocate packed tensors auto packed_recv_x = torch::empty({num_local_experts, num_ranks * num_max_dispatch_tokens_per_rank, hidden}, x.options().dtype(torch::kFloat8_e4m3fn)); auto packed_recv_src_info = torch::empty({num_local_experts, num_ranks * num_max_dispatch_tokens_per_rank}, torch::dtype(torch::kInt32).device(torch::kCUDA)); auto packed_recv_layout_range = torch::empty({num_local_experts, num_ranks}, torch::dtype(torch::kInt64).device(torch::kCUDA)); auto packed_recv_count = torch::from_blob(buffer.dispatch_rdma_atomic_token_counter, {num_local_experts}, torch::dtype(torch::kInt32).device(torch::kCUDA));
另外还分配了FP8细粒度量化时使用的scale, 并且考虑到TMA加载的优化需要保证num_token能整除4
// Allocate column-majored scales EP_HOST_ASSERT((num_ranks * num_max_dispatch_tokens_per_rank) % 4 == 0 and "TMA requires the number of tokens to be multiple of 4" ); auto packed_recv_x_scales = torch::empty({num_local_experts, num_scales, num_ranks * num_max_dispatch_tokens_per_rank}, torch::dtype(torch::kFloat32).device(torch::kCUDA)); packed_recv_x_scales = torch::transpose(packed_recv_x_scales, 1 , 2 );
Fine-graine quantization所使用的Scale-Factor如图所示:
然后就是调用
csrc\kernels\internode_ll.cu
的dispatch kernel. 它通过一个
phase
变量判断是
LOW_LATENCY_SEND_PHASE
还是
LOW_LATENCY_RECV_PHASE
.
3.2.2.1 SEND PHASE
SEND阶段的Warp分为两类, 第一类执行FP8转换和发送TopK token, 第二类为最后一个warp用于读取topk_idx并统计per-expert信息.FP8转换的算法如下, 转换后将其写入到发送Buffer中.
// FP8 cast
# pragma unroll for ( int i = thread_id; i < hidden_bf16_int4; i += num_threads) { // Read and calculate local amax auto int4_value = __ldg(x_int4 + i); auto bf16_values = reinterpret_cast (&int4_value); float fp32_values[kNumElemsPerRead]; float amax = kFP8Margin, scale, scale_inv; # pragma unroll for ( int j = 0 ; j < kNumElemsPerRead; ++ j) { fp32_values[j] = static_cast < float >(bf16_values[j]); amax = fmaxf(amax, fabsf(fp32_values[j])); } // Reduce amax and scale EP_STATIC_ASSERT(kNumElemsPerRead * 32 / kNumPerChannels == 2 , "Invalid vectorization" ); amax = half_warp_reduce_max(amax), scale = kFP8Amax / amax, scale_inv = amax * kFP8AmaxInv; if (lane_id == 0 or lane_id == 16 ) rdma_x_scales[i * kNumElemsPerRead / 128 ] = scale_inv; // Cast into send buffer int2 int2_value; auto fp8x2_values = reinterpret_cast < __nv_fp8x2_storage_t *>(&int2_value); # pragma unroll for ( int j = 0 ; j < kNumElemsPerRead; j += 2 ) { float2 fp32x2 = {fp32_values[j] * scale, fp32_values[j + 1 ] * scale}; fp8x2_values[j / 2 ] = __nv_cvt_float2_to_fp8x2(fp32x2, __NV_SATFINITE, __NV_E4M3); } rdma_x_int2[i] = int2_value; }
然后通过调用IBGDA进行发送到指定的slot, 这样可以实现在AdaptiveRouting开启时发送不用保序, 发送完成后更新原子计数器
// Issue IBGDA sends if (dst_expert_idx >= 0 ) { int slot_idx = lane_id == 0 ? atomicAdd(atomic_counter_per_expert + dst_expert_idx, 1 ) : 0 ; slot_idx = __shfl_sync( 0xffffffff , slot_idx, 0 ); const auto dst_rank = dst_expert_idx / num_local_experts; const auto dst_expert_local_idx = dst_expert_idx % num_local_experts; const auto src_ptr = reinterpret_cast < uint64_t >(rdma_x_int2); const auto dst_ptr = reinterpret_cast < uint64_t >(rdma_recv_x) + dst_expert_local_idx * num_ranks * num_max_dispatch_tokens_per_rank * num_bytes_per_msg + rank * num_max_dispatch_tokens_per_rank * num_bytes_per_msg + slot_idx * num_bytes_per_msg; if (dst_rank != rank) { nvshmemi_ibgda_put_nbi_warp(dst_ptr, src_ptr, num_bytes_per_msg, dst_rank, dst_expert_local_idx, lane_id, slot_idx); } else { // NOTES: only 2 load iterations for 7K hidden with 8 unrolls const auto * src_int4_ptr = reinterpret_cast < const int4*>(src_ptr); const auto * dst_int4_ptr = reinterpret_cast (dst_ptr); UNROLLED_WARP_COPY( 8 , lane_id, num_int4_per_msg, dst_int4_ptr, src_int4_ptr, ld_nc_global, st_na_global); } // Increase counter after finishing __syncwarp(); lane_id == 0 ? atomic_add_release_global(atomic_finish_counter_per_expert + dst_expert_idx, 1 ) : 0 ; }
最后一个Warp用于分配任务给不同的SM并处理Expert任务分发和同步.
3.2.2.2 RECV PHASE
接收采用两个sub warp交替处理, 首先根据
responsible_expert_idx
计算
src_rank
和本地专家索引
local_expert_idx
通过
nvshmemi_ibgda_poll_recv(src_rank, local_expert_idx)
进行polling.然后拷贝Token和相应的scale数据和source info.
3.2.3 低延迟Combine
同样也分为SEND和RECV两个PHASE, 调用方式如下
def low_latency_combine (hidden_states: torch.Tensor, topk_idx: torch.Tensor, topk_weights: torch.Tensor, handle: Tuple) : global _buffer # Do MoE combine, compatible with CUDA graph (but you may restore some buffer status once you replay) combined_hidden_states, event_overlap, hook = \ _buffer.low_latency_combine(hidden_states, topk_idx, topk_weights, handle, async_finish= False , return_recv_hook= True ) # NOTES: the same behavior as described in the dispatch kernel