使用Spark构建大规模特征训练样本
由于涉及业务保密点, 相关数据此文相比内部分享会有所阉割.
背景
近期算法团队在探索宽模型和深度学习, 我们团队针对算法业务开发了基于Spark处理大规模特征样本的程序,有了这套简单的框架之后特征样本构建和模型调参分开了, 专业的人做专业的事情, 大大提高了算法同学迭代模型的效率.
腾讯面向高维度的机器学习计算框架–Angel 在我司现有基础设施环境下, 只能稳定训练1亿维左右的特征, 但是根据线上的实际数据统计,例如百万级别的商品跟千万级别的用户交叉, 特征很容易就达到了百亿维 直接导致无法进行训练.考虑到大部分的交叉组合是长尾特征, 真正对权重的贡献并不会很大, 可以在特征预处理上提前过滤低频特征, 达到减维的效果,减少后续训练的压力.
整体的流程图
特征构建遇到的问题
1. 样本OneHot慢的问题
交叉组合后的训练样例中的特征值需要替换成对应的数值索引,也就是OneHot的过程, 以便Angel能直接训练这份数据.
特征索引映射 如表1格式所示
特征值 | 索引值 |
---|---|
feature_user_item:user1_item2 | 1 |
feature_user_item:user2_item3 | 2 |
feature_user_shop:user1_shop1 | 3 |
feature_ctr:50 | 4 |
样例表2的格式如下所示
logid | feature_user_item | feature_user_shop | feature_ctr |
---|---|---|---|
abc | [user1_item2, user1_item3, user1_item4] | [user1_shop1, user1_shop1] | 40 |
cba | [user2_item2, user1_item4] | [user2_shop4, user2_shop5] | 50 |
bac | [user3_item7, user2_item2] | 50 |
将样例表的的实际数据替换成对应的索引值, 希望将表2处理之后的数据如表3所示
logid | feature_user_item | feature_user_shop | feature_ctr |
---|---|---|---|
abc | [1] | [2] | |
cba | 4 | ||
bac | 4 |
为了实现这种计算,我们最原始的做法是将每一列的的数据,分别去跟索引表去join, join好一列之后再join下一列.这样在列数少的情况下, join的速度也不慢, 但是join花费的时间随着列数的增多而线性增长,加上有些列的数据类型是array, 需要将其展开成多列来进行join, 放大了问题, 处理的耗时达到了数个小时.严重减慢了算法同学迭代新特征的速度.
于是我们着手优化这个过程. 首先我们想到的点是将索引表广播出去, 这样就不用走merge join, 不用对样例表进行shuffle操作,索引表在比较小的时候,在现有的集群资源配置限制下(单个Executor的最大内存是32G,如果内存支持很大,就没后面什么事情了, 但是JVM的内存32G是道坎),大概是4KW的规模 广播出去是没有问题的, 能广播出去的情况下, join的时候也是非常快的, 时间减少到一个小时内.
当索引表规模达到5KW的时候,直接整表广播, Driver端GC就非常严重了, Executor也非常不稳定,当时比较费解, 单独把这部分数据加载到HashMap里面, 占用量只有大约Executor内存的20%左右,为啥gc会这么严重呢?后面去看了下Spark的原理,解决了心中的疑惑.
一个原因是Collect下来的是Row(org.apache.spark.sql.Row)对象,Row对象里面有个StructType属性,保留着schema信息,在这个场景, schema的信息比实际的数据还大,序列化是一笔不小的开销.
另有一个原因是Spark的广播机制
在Spark2.x中, 已经移除HTTPBroadcast, 仅有的一种实现是TorrentBroadcast.实现原理类似于大家常用的 BT下载技术。基本思想就是将数据分块成 data blocks,如果Executor 取到了一些 data blocks,那么这个 Executor 就可以被当作 data server 了,随着取到数据的 Executor 越来越多,有更多的 data server 加入,数据就很快能传播到全部的 Executor 那里去了.
在广播的过程中会将数据冗余一份到BlockManager,供其他Executor进行读取. 其原理如图所示:
在广播的过程中, Driver端和Executor端都会有短暂的时间达到2倍的内存占用,
-
Driver端
Driver端先把数据序列成byteArray, 切割成小块的data block再广播出去, 切割的过程,内存会不断接近2倍byteArray的大小, 直到切割完之后,将byteArray释放掉.
-
Executor端
Executor装载广播的数据是driver的反过程, 每次拿到一个data block之后, 就将其存放到BlockManager, 同时通知Driver的BlockManagerMaster说这个block多了一个存储的地方,供其他Executor下载.等Executor把所有的block都从其他地方拿全之后,先申请一个Array[Byte],将block的数据进行反序列化之后得到原始数据.这个过程中和Driver端一样,内存会不断接近2倍数据的大小, 直到反序列化完成.
通过了解了Spark广播的实现, 就可以解释广播5KW维特征的时GC严重的问题.
随着实验特征的迭代,表2的列数会不断的增多,处理时间会随着列数的增多而线性增加, 特征索引的规模增多,会导致广播的过程中GC问题越来越严重.
这个阶段需要解决2个问题
- 能将表1广播到顺利地广播到各个Executor
- 高效得替换每一列索引值
综合这两个问题, 我们想出了一个解决方案, 将表1先按照特征值字典序排好, 然后再重新编码, 用一个长度为max(索引值)长度的数组去存储, 索引值作为下标,对应的元素为特征值,将其广播到Executor之后, 接着遍历表2的每一行的每一列, 实际上就是对应的特征值, 去上面的数组中二分查找出对应的索引值并替换掉.
使用下标数据存储表1, 特征值按照平均长度64个字符计算, 每个字符占用1个字节, 5千万维特征需要3.2G内存,广播的实际表现还ok 1亿维特征的话需要占用6.4G内存, 按照广播的时候会有双倍内存占用的情况,gc会比较严重. 我们又想了一个折中的办法, 将字符串hash成long,long仅占用8字节,比起存储字符串来说大大节省了空间, hash的有个问题是可能会冲突, 由于8字节的hash映射空间有 -2^63
到 2^63-1
, 我们使用的是BKDRHash, 在我们这个场景有少量冲突也是可以接受的, 由于这个方法可以大大节省占用的内存,1亿特征仅占用800M的内存, 广播起来毫无压力,对应的在遍历表2的时候, 需要先将特征值用同样的算法hash之后再进行查找. 经过这一轮优化之后, 相同资源的情况下,处理10亿样本, 5千万的特征, 耗时已经降低到半个小时了, 且内存情况相对稳定.
这种情况跑了一段时间之后, 特征规模上到1亿了, 发现这一步的耗时已经上升到45分钟了,分析了下特征的分布,发现连续特征离散化后特征在表2出现的频率很高,基本每一条数据都有其出现,但是这类特征在表1的分布不多, 这完全可以利用缓存把这类特征对应的索引值保存下来, 而没必要走hash之后再二分搜索,完全可以用很少量的空间节省大量的时间. 实际实现的时候,判断需要查找的特征值是否符合以上的这种情况, 如果符合的话, 直接用guava缓存表2的特征值->表1的索引值,实际统计的缓存命中率是99.98888%, 实际耗时下降得也很明显, 从之前的45分钟降到17分钟.当然缓存并不是银弹,在算hash的时候误用了缓存, 导致这一步的计算反而变得慢了, 因为hash的组合实在是太多了, 缓存命中率只有10%左右,需要有大量的资源浪费在维护缓存.在实际使用缓存的时候, 有必要去统计一下缓存的命中率,Guava就已经自带了这个工具.
2. 数据倾斜
1.数据不唯一
在整个过程中,我们使用了唯一数据标示计算了一个logid, 主要作用是
-
链路追踪
-
作为特征的join key
-
记录去重
在实际的数据中, 这个logid并不是唯一的, 因为整个日志收集链路(存在客户端重复上传, Kafka消费重复, hash冲突)是无法保证行为日志的logid的唯一性,在最开始的时候数据量小,表1能全部广播出去, 数据倾斜的问题就被掩盖了, 中间过程我们尝试了使用join的方式去解决问题, 由于2张都是大表,走的是Merge Join, 问题就暴露出来了,数据倾斜到单个任务上了,最严重的时候一个task卡了几个小时都没跑过去. 最后分析了才发现是数据的问题,如果没去重的话, 两者张表join的时候, 会出现多对多的关系, 在问题1中有多次join的话,这个问题会被放大,最终严重到几个小时都没跑完单个任务,从源头开始进行去重结合过滤全站爬虫的数据之后, 在这一阶段问题得到了解决.当然我们的最终方案采用了广播表1的所有数据, 所以这个问题导致的倾斜不存在了.
2. 特征的频次倾斜
在实际的数据中, 由于有活跃用户或者热门商品的存在, 特征的分布存在相对集中的情况, 在对这些特征统计的时候, 会有倾斜的问题,我们现在的做法是采用加盐的方式,为所有特征先加上固定范围的盐前缀,第一次聚合出带盐的特征统计,然后去盐之后二次统计.