使用Spark构建大规模特征训练样本

由于涉及业务保密点, 相关数据此文相比内部分享会有所阉割.

背景

近期算法团队在探索宽模型和深度学习, 我们团队针对算法业务开发了基于Spark处理大规模特征样本的程序,有了这套简单的框架之后特征样本构建和模型调参分开了, 专业的人做专业的事情, 大大提高了算法同学迭代模型的效率.

腾讯面向高维度的机器学习计算框架–Angel 在我司现有基础设施环境下, 只能稳定训练1亿维左右的特征, 但是根据线上的实际数据统计,例如百万级别的商品跟千万级别的用户交叉, 特征很容易就达到了百亿维 直接导致无法进行训练.考虑到大部分的交叉组合是长尾特征, 真正对权重的贡献并不会很大, 可以在特征预处理上提前过滤低频特征, 达到减维的效果,减少后续训练的压力.

整体的流程图

image

特征构建遇到的问题

1. 样本OneHot慢的问题

交叉组合后的训练样例中的特征值需要替换成对应的数值索引,也就是OneHot的过程, 以便Angel能直接训练这份数据.

特征索引映射 如表1格式所示

特征值 索引值
feature_user_item:user1_item2 1
feature_user_item:user2_item3 2
feature_user_shop:user1_shop1 3
feature_ctr:50 4

样例表2的格式如下所示

logid feature_user_item feature_user_shop feature_ctr
abc [user1_item2, user1_item3, user1_item4] [user1_shop1, user1_shop1] 40
cba [user2_item2, user1_item4] [user2_shop4, user2_shop5] 50
bac [user3_item7, user2_item2]   50

将样例表的的实际数据替换成对应的索引值, 希望将表2处理之后的数据如表3所示

logid feature_user_item feature_user_shop feature_ctr
abc [1] [2]  
cba     4
bac     4

为了实现这种计算,我们最原始的做法是将每一列的的数据,分别去跟索引表去join, join好一列之后再join下一列.这样在列数少的情况下, join的速度也不慢, 但是join花费的时间随着列数的增多而线性增长,加上有些列的数据类型是array, 需要将其展开成多列来进行join, 放大了问题, 处理的耗时达到了数个小时.严重减慢了算法同学迭代新特征的速度.

于是我们着手优化这个过程. 首先我们想到的点是将索引表广播出去, 这样就不用走merge join, 不用对样例表进行shuffle操作,索引表在比较小的时候,在现有的集群资源配置限制下(单个Executor的最大内存是32G,如果内存支持很大,就没后面什么事情了, 但是JVM的内存32G是道坎),大概是4KW的规模 广播出去是没有问题的, 能广播出去的情况下, join的时候也是非常快的, 时间减少到一个小时内.

当索引表规模达到5KW的时候,直接整表广播, Driver端GC就非常严重了, Executor也非常不稳定,当时比较费解, 单独把这部分数据加载到HashMap里面, 占用量只有大约Executor内存的20%左右,为啥gc会这么严重呢?后面去看了下Spark的原理,解决了心中的疑惑.

一个原因是Collect下来的是Row(org.apache.spark.sql.Row)对象,Row对象里面有个StructType属性,保留着schema信息,在这个场景, schema的信息比实际的数据还大,序列化是一笔不小的开销.

另有一个原因是Spark的广播机制

在Spark2.x中, 已经移除HTTPBroadcast, 仅有的一种实现是TorrentBroadcast.实现原理类似于大家常用的 BT下载技术。基本思想就是将数据分块成 data blocks,如果Executor 取到了一些 data blocks,那么这个 Executor 就可以被当作 data server 了,随着取到数据的 Executor 越来越多,有更多的 data server 加入,数据就很快能传播到全部的 Executor 那里去了.

在广播的过程中会将数据冗余一份到BlockManager,供其他Executor进行读取. 其原理如图所示:

image

在广播的过程中, Driver端和Executor端都会有短暂的时间达到2倍的内存占用,

通过了解了Spark广播的实现, 就可以解释广播5KW维特征的时GC严重的问题.

随着实验特征的迭代,表2的列数会不断的增多,处理时间会随着列数的增多而线性增加, 特征索引的规模增多,会导致广播的过程中GC问题越来越严重.

这个阶段需要解决2个问题

  1. 能将表1广播到顺利地广播到各个Executor
  2. 高效得替换每一列索引值

综合这两个问题, 我们想出了一个解决方案, 将表1先按照特征值字典序排好, 然后再重新编码, 用一个长度为max(索引值)长度的数组去存储, 索引值作为下标,对应的元素为特征值,将其广播到Executor之后, 接着遍历表2的每一行的每一列, 实际上就是对应的特征值, 去上面的数组中二分查找出对应的索引值并替换掉.

使用下标数据存储表1, 特征值按照平均长度64个字符计算, 每个字符占用1个字节, 5千万维特征需要3.2G内存,广播的实际表现还ok 1亿维特征的话需要占用6.4G内存, 按照广播的时候会有双倍内存占用的情况,gc会比较严重. 我们又想了一个折中的办法, 将字符串hash成long,long仅占用8字节,比起存储字符串来说大大节省了空间, hash的有个问题是可能会冲突, 由于8字节的hash映射空间有 -2^632^63-1, 我们使用的是BKDRHash, 在我们这个场景有少量冲突也是可以接受的, 由于这个方法可以大大节省占用的内存,1亿特征仅占用800M的内存, 广播起来毫无压力,对应的在遍历表2的时候, 需要先将特征值用同样的算法hash之后再进行查找. 经过这一轮优化之后, 相同资源的情况下,处理10亿样本, 5千万的特征, 耗时已经降低到半个小时了, 且内存情况相对稳定.

这种情况跑了一段时间之后, 特征规模上到1亿了, 发现这一步的耗时已经上升到45分钟了,分析了下特征的分布,发现连续特征离散化后特征在表2出现的频率很高,基本每一条数据都有其出现,但是这类特征在表1的分布不多, 这完全可以利用缓存把这类特征对应的索引值保存下来, 而没必要走hash之后再二分搜索,完全可以用很少量的空间节省大量的时间. 实际实现的时候,判断需要查找的特征值是否符合以上的这种情况, 如果符合的话, 直接用guava缓存表2的特征值->表1的索引值,实际统计的缓存命中率是99.98888%, 实际耗时下降得也很明显, 从之前的45分钟降到17分钟.当然缓存并不是银弹,在算hash的时候误用了缓存, 导致这一步的计算反而变得慢了, 因为hash的组合实在是太多了, 缓存命中率只有10%左右,需要有大量的资源浪费在维护缓存.在实际使用缓存的时候, 有必要去统计一下缓存的命中率,Guava就已经自带了这个工具.

2. 数据倾斜

1.数据不唯一

在整个过程中,我们使用了唯一数据标示计算了一个logid, 主要作用是

  1. 链路追踪

  2. 作为特征的join key

  3. 记录去重

在实际的数据中, 这个logid并不是唯一的, 因为整个日志收集链路(存在客户端重复上传, Kafka消费重复, hash冲突)是无法保证行为日志的logid的唯一性,在最开始的时候数据量小,表1能全部广播出去, 数据倾斜的问题就被掩盖了, 中间过程我们尝试了使用join的方式去解决问题, 由于2张都是大表,走的是Merge Join, 问题就暴露出来了,数据倾斜到单个任务上了,最严重的时候一个task卡了几个小时都没跑过去. 最后分析了才发现是数据的问题,如果没去重的话, 两者张表join的时候, 会出现多对多的关系, 在问题1中有多次join的话,这个问题会被放大,最终严重到几个小时都没跑完单个任务,从源头开始进行去重结合过滤全站爬虫的数据之后, 在这一阶段问题得到了解决.当然我们的最终方案采用了广播表1的所有数据, 所以这个问题导致的倾斜不存在了.

2. 特征的频次倾斜

在实际的数据中, 由于有活跃用户或者热门商品的存在, 特征的分布存在相对集中的情况, 在对这些特征统计的时候, 会有倾斜的问题,我们现在的做法是采用加盐的方式,为所有特征先加上固定范围的盐前缀,第一次聚合出带盐的特征统计,然后去盐之后二次统计.

Table of Contents