众所周知,自2015年以来微博的业务发展迅猛。如果根据内容来划分,微博的业务有主信息(Feed)流、热门微博、微博推送(Push)、反垃圾、微博分发控制等。每个业务都有自己不同的用户构成、业务关注点和数据特征。庞大的用户基数下,由用户相互关注衍生的用户间关系,以及用户千人千面的个性化需求,要求我们用更高、更大规模的维度去刻画和描绘用户。大体量的微博内容,也呈现出多样化、多媒体化的发展趋势。
一直以来,微博都尝试通过机器学习来解决业务场景中遇到的各种挑战。本文为新浪微博吴磊在CCTC 2017云计算大会Spark峰会所做分享《基于Spark的大规模机器学习在微博的应用》主题的一部分,介绍微博在面对大规模机器学习的挑战时,采取的最佳实践和解决方案。
Spark Mllib
针对微博近百亿特征维度、近万亿样本量的模型训练需求,我们首先尝试了Apache Spark原生实现的逻辑回归算法。采用该方式的优点显而易见,即开发周期短、试错成本低。我们将不同来源的特征(用户、微博内容、用户间关系、使用环境等)根据业务需要进行数据清洗、提取、离散化,生成Libsvm格式的可训练样本集,再将样本喂给LR算法进行训练。在维度升高的过程中,我们遇到了不同方面的问题,并通过实践提供了解决办法。
【Stack overflow】
栈溢出的问题在函数嵌套调用中非常普遍,但在我们的实践中发现,过多Spark RDD的union操作,同样会导致栈溢出的问题。解决办法自然是避免大量的RDD union,转而采用其他的实现方式。
【AUC=0.5】
在进行模型训练的过程中,曾出现测试集AUC一直停留在0.5的尴尬局面。通过仔细查看训练参数,发现当LR的学习率设置较大时,梯度下降会在局部最优左右摇摆,造成训练出来的模型成本偏高,拟合性差。通过适当调整学习率可以避免该问题的出现。
【整型越界】
整型越界通常是指给定的数据值过大,超出了整形(32bit Int)的上限。但在我们的场景中,导致整型越界的并不是某个具体数据值的大小,而是因为训练样本数据量过大、HDFS的分片过大,导致Spark RDD的单个分片内的数据记录条数超出了整型上限,进而导致越界。Spark RDD中的迭代器以整数(Int)来记录Iterator的位置,当记录数超过32位整型所包含的范围(2147483647),就会报出该错误。
解决办法是在Spark加载HDFS中的HadoopRDD时,设置分区数,将分区数设置足够大,从而保证每个分片的数据量足够小,以避免该问题。可以通过公式(总记录数/单个分片记录数)来计算合理的分区数。
【Shuffle fetch failed】
在分布式计算中,Shuffle阶段不可避免,在Shuffle的Map阶段,Spark会将Map输出缓存到本机的本地文件系统。当Map输出的数据较大,且本地文件系统存储空间不足时,会导致Shuffle中间文件的丢失,这是Shuffle fetch failed错误的常见原因。但在我们的场景中,我们手工设置了spark.local.dir配置项,将其指向存储空间足够、I/O效率较高的文件系统中,但还是碰到了该问题。
通过仔细查对日志和Spark UI的记录,发现有个别Executor因任务过重、GC时间过长,丢失了与Driver的心跳。Driver感知不到这些Executor的心跳,便主动要求Yarn的Application master将包含这些Executor的Container杀掉。
皮之不存、毛之焉附,Executor被杀掉了,存储在其中的Map输出信息自然也就丢了,造成在Reduce阶段,Reducer无法获得属于自己的那份Map输出。解决办法是合理地设置JVM的GC设置,或者通过将spark.network.timeout的时间(默认60s)设置为120s,该时间为Driver与Executor心跳通信的超时时间,给Executor足够的响应时间,让其不必因处理任务过重而无暇与Driver端通信。
通过各种优化,我们将模型的维度提升至千万维。当模型维度冲击到亿维时,因Spark Mllib LR的实现为非模型并行,过高的模型维度会导致海森矩阵呈指数级上涨,导致内存和网络I/O的极大开销。因此我们不得不尝试其他的解决方案。
原文链接:
http://geek.csdn.net/news/detail/203834?url_type=39&object_type=webpage&pos=1