专栏名称: 大数据架构
分享和交流大数据领域技术,包括但不限于Storm、Spark、Hadoop等流行分布式计算系统,Kafka等分布式消息系统,PostgreSQL、MySQL等RDBMS。更多精彩好文见www.jasongj.com
目录
相关文章推荐
数据派THU  ·  【CIKM2024教程】高效的时间图学习:算 ... ·  昨天  
程序员鱼皮  ·  75k,直接封神! ·  4 天前  
程序员鱼皮  ·  75k,直接封神! ·  4 天前  
数据派THU  ·  干货 | 清华Python编程入门分享会第三期 ·  1 周前  
数据派THU  ·  11种经典时间序列预测方法:理论、Pytho ... ·  1 周前  
51好读  ›  专栏  ›  大数据架构

解决Spark数据倾斜(一) 分散同一Task的不同Key

大数据架构  · 公众号  · 大数据  · 2017-03-07 07:27

正文

点击上方
大数据架构 快速关注



分散同一Task的不同Key

本文结合实例分析了通过调整并行度和使用自定义Partitioner缓解Spark数据倾斜的原理与适用场景


什么是数据倾斜

对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。

  

何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。


数据倾斜是如何造成的

在Spark中,同一个Stage的不同Partition可以并行处理,而具有依赖关系的不同Stage之间是串行处理的。假设某个Spark Job分为Stage 0和Stage 1两个Stage,且Stage 1依赖于Stage 0,那Stage 0完全处理结束之前不会处理Stage 1。而Stage 0可能包含N个Task,这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成,而另外一个Task却耗时1分钟,那该Stage的总时间至少为1分钟。换句话说,一个Stage所耗费的时间,主要由最慢的那个Task决定。


由于同一个Stage内的所有Task执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同Task之间耗时的差异主要由该Task所处理的数据量决定。


调整并行度分散同一个Task的不同Key

原理

Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。


如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。



案例

现有一张测试表,名为student_external,内有10.5亿条数据,每条数据有一个唯一的id值。现从中取出id取值为9亿到10.5亿的共1.5条数据,并通过一些处理,使得id为9亿到9.4亿间的所有数据对12取模后余数为8(即在Shuffle并行度为12时该数据集全部被HashPartition分配到第8个Task),其它数据集对其id除以100取整,从而使得id大于9.4亿的数据在Shuffle时可被均匀分配到所有Task中,而id小于9.4亿的数据全部分配到同一个Task中。处理过程如下


INSERT OVERWRITE TABLE test

SELECT CASE WHEN id < 940000000 THEN (9500000  + (CAST (RAND() * 8 AS INTEGER)) * 12 )

       ELSE CAST(id/100 AS INTEGER)

       END,

       name

FROM student_external

WHERE id BETWEEN 900000000 AND 1050000000;


通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好。接下来,使用Spark读取该测试数据,并通过groupByKey(12)对id分组处理,且Shuffle并行度为12。代码如下

public class SparkDataSkew {

  public static void main(String[] args) {

    SparkSession sparkSession = SparkSession.builder()

      .appName("SparkDataSkewTunning")

      .config("hive.metastore.uris", "thrift://hadoop1:9083")

      .enableHiveSupport()

      .getOrCreate();


    Dataset dataframe = sparkSession.sql( "select * from test");

    dataframe.toJavaRDD()

      .mapToPair((Row row) -> new Tuple2(row.getInt(0),row.getString(1)))

      .groupByKey(12)

      .mapToPair((Tuple2> tuple) -> {

        int id = tuple._1();

        AtomicInteger atomicInteger = new AtomicInteger(0);

        tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());

        return new Tuple2(id, atomicInteger.get());

      }).count();


      sparkSession.stop();

      sparkSession.close();

  }

  

}


本次实验所使用集群节点数为4,每个节点可被Yarn使用的CPU核数为16,内存为16GB。使用如下方式提交上述应用,将启动4个Executor,每个Executor可使用核数为12(该配置并非生产环境下的最优配置,仅用于本文实验),可用内存为12GB。


spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar


GroupBy Stage的Task状态如下图所示,Task 8处理的记录数为4500万,远大于(9倍于)其它11个Task处理的500万记录。而Task 8所耗费的时间为38秒,远高于其它11个Task的平均时间(16秒)。整个Stage的时间也为38秒,该时间主要由最慢的Task 8决定。


在这种情况下,可以通过调整Shuffle并行度,使得原来被分配到同一个Task(即该例中的Task 8)的不同Key分配到不同Task,从而降低Task 8所需处理的数据量,缓解数据倾斜。


通过groupByKey(48)将Shuffle并行度调整为48,重新提交到Spark。新的Job的GroupBy Stage所有Task状态如下图所示。


从上图可知,记录数最多的Task 20处理的记录数约为1125万,相比于并行度为12时Task 8的4500万,降低了75%左右,而其耗时从原来Task 8的38秒降到了24秒。


在这种场景下,调整并行度,并不意味着一定要增加并行度,也可能是减小并行度。如果通过groupByKey(11)将Shuffle并行度调整为11,重新提交到Spark。新Job的GroupBy Stage的所有Task状态如下图所示。


从上图可见,处理记录数最多的Task 6所处理的记录数约为1045万,耗时为23秒。处理记录数最少的Task 1处理的记录数约为545万,耗时12秒。


总结

适用场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。


解决方案:调整并行度。一般是增大并行度,但有时如本例减小并行度也可达到效果。


优势:实现简单,可在需要Shuffle的操作算子上直接设置并行度或者使用`spark.default.parallelism`设置。如果是Spark SQL,还可通过`SET spark.sql.shuffle.partitions=[num_tasks]`设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。


劣势:适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。


自定义Partitioner

原理

使用自定义的Partitioner(默认为HashPartitioner),将原本被分配到同一个Task的不同Key分配到不同Task。


案例

以上述数据集为例,继续将并发度设置为12,但是在groupByKey算子上,使用自定义的Partitioner(实现如下)

.groupByKey(new Partitioner() {

  @Override

  public int numPartitions() {

    return 12;

  }


  @Override

  public int getPartition(Object key) {

    int id = Integer.parseInt(key.toString());

    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {

      return (id - 9500000) / 12;

    } else {

      return id % 12;

    }

  }

})


由下图可见,使用自定义Partition后,耗时最长的Task 6处理约1000万条数据,用时15秒。并且各Task所处理的数据集大小相当。


总结

适用场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。


解决方案:使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。


优势:不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。


劣势:适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。


长按下方二维码可快速关注

版权声明

原创文章,始发自作者个人博客www.jasongj.com。作者[郭俊(Jason)],一位讲方法论的实干者。转载请在文章开头处注明转自【大数据架构】并以超链接形式注明原文链接http://www.jasongj.com/spark/skew/


点击“阅读全文”,查看作者个人博客