【大数据】Spark优化经验&案例--数据倾斜 《Mysql必知必会》读书笔记 jar包名中自动添加git commit id PyCharm教学视频学习笔记 《SQL基础教程》简要总结 《设计师要懂心理学》读书笔记 MySQL与MariaDB学习笔记 WDT (Folly) 安装指南 -- CentOS 7 [solved]Page build failed(Jekyll) 数据包过滤及分析实例 tshark tcpdump Scala Tour 学习总结 “Docker容器和容器云”读书笔记(1) “Docker Practice”读书笔记 “图解基础设施设计模式”小结 “图解服务器端网络架构”小结 Python网络安全编程 数据包解析笔记 华为挑战赛(1) DDoS攻击防御与云服务 基于网络回溯分析技术的异常行为分析 “Linux程序设计”小结(进程间通信) C语言编程规范(华为软件精英挑战赛) 2017阿里在线编程题--单源最短路径问题 2017年阿里在线编程题-- 数串分组 Uinx/Linux上的帮助查询命令 你懂C,所以C++不在话下 一篇特别长的总结(C专家编程) 程序员面试金典--笔记(精华篇) C陷阱与缺陷--笔记 半小时搭建电子商务网站--opencart linux网络知识和工具(持续更新) 网卡参数查询及设置工具ethtool 高性能流量生成工具trafgen(DDoS模拟) Linux流量控制工具TC 流量控制工具TC详细说明 tcpdump过滤数据包,结果不对? Lecture 网络攻击与防御技术笔记 gotgit-git权威指南 高效使用MacOS所要知道的 shell内置字符串处理 配置ntp(知其所以然) 360黑客攻防技术分享会--记录 中毒U盘恢复--快捷键病毒 Tor--anonymity network介绍(PPT) IBM bluemix 再读《Linux Shell脚本攻略》 linux shell 学习摘记(9) linux shell 学习摘记(8) linux shell 学习摘记(7) linux shell 学习摘记(6) linux shell 学习摘记(5) linux shell 学习摘记(4) linux shell 学习摘记(3) linux shell 学习摘记(2) linux shell 学习摘记(1) firefox vim 插件 vimperator A Byte of Vim 笔记 windows注册表小知识 安全测试工具篇(开源&商业) 安全及性能测试工具(网站收集) 性能测试工具 屡试不爽的“3个”iPad使用技巧 Shell Shortcuts(和Tab键一样实用) vim--自动添加jekyll post信息头 vim 自动给文件添加头部信息 GitHub Tips (很实用,值得收藏) Linux路由、防火墙、NAT命令

【大数据】Spark优化经验&案例--数据倾斜

2020年04月15日

博客链接: http://codeshold.me/2020/04/spark_optimize_skew.html

[TOC]

0. 十秒看完

1.业务处理中存在复杂的多表关联和计算逻辑(原始数据达百亿数量级)
2.优化后,spark计算性能提升了约12倍(6h–>30min)
3.最终,业务的性能瓶颈存在于ES写入(计算结果,ES索引document数约为21亿 pri.store.size300gb

示意图

1. 背景

  1. 业务数据不断增大, Spark运行时间越来越长, 从最初的半小时到6个多小时
  2. 某日Spark程序运行6.5个小时后, 报“Too large frame…”的异常
    • org.apache.spark.shuffle.FetchFailedException: Too large frame: 2624680416

2. 原因分析

2.1. 抛出异常的原因

  1. Spark uses custom frame decoder (TransportFrameDecoder) which does not support frames larger than 2G. This lead to fails when shuffling using large partitions. 链接
  2. 根本原因: 源数据的某一列(或某几列)分布不均匀,当某个shuffle操作是根据此列数据进行shuffle时,就会造成整个数据集发生倾斜,即某些partition包含了大量数据,超出了2G的限制。
  3. 异常,就是发生在业务数据处理的最后一步left join操作

2.2. 粗暴的临时解决方法

  1. 增大partition数, 让partition中的数据量<2g
    • 由于是left join触发了shuffle操作, 而spark默认join时的分区数为200(即spark.sql.shuffle.partitions=200), 所以增大这个分区数, 即调整该参数为800, 即 spark.sql.shuffle.partitions=800

2.3. 解决效果

  1. Spark不再报错,而且“艰难”的跑完了, 跑了近6个小时!
  2. 通过Spark UI页面的监控发现, 由于数据倾斜导致, 整个Spark任务的运行时间是被少数的几个Task“拖累的”
    c9b9ce9cb0ceefc332895fd2c9502d6b.png@w=620

3. 思考优化

3.1. 确认数据倾斜

  • 方法一: 通过sample算子对DataSet/DataFrame/RDD进行采样, 找出top n的key值及数量
  • 方法二: 源数据/中间数据落到存储中(如HIVE), 直接查询观察

3.2. 可选方法

  1. HIVE ETL 数据预处理
    • 把数据倾斜提前到 HIVE ETL中, 避免Spark发生数据倾斜
    • 这个其实很有用
  2. 过滤无效的数据 (where / filter)
    • NULL值数据
    • “脏数据”(非法数据)
    • 业务无关的数据
  3. 分析join操作, 左右表的特征, 判断是否可以进行小表广播 broadcast
    • 这样可避免shuffle操作,特别是当大表特别大
    • 默认情况下, join时候, 如果表的数据量低于spark.sql.autoBroadcastJoinThreshold参数值时(默认值为10 MB), spark会自动进行broadcast, 但也可以通过强制手动指定广播
      • visitor_df.join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")
      • 业务数据量是100MB
    • Driver上有一个campaign_df全量的副本, 每个Executor上也会有一个campaign_df的副本
    • JOIN操作, Spark默认都会进行 merge_sort (也需要避免倾斜)
  4. 数据打散, 扩容join
    • 分散倾斜的数据, 给key加上随机数前缀
    • A.join(B)
    • a0929ecd6d862e87e6dfc2dbb77eac65.png@w=620
  5. 提高shuffle操作并行度
    • spark.sql.shuffle.partitions
  6. 多阶段
    • aggregate操作: 先局部聚合, 再全局聚合
      1. 给key打随机值, 如打上1-10, 先分别针对10个组做聚合
      2. 最后再统一聚合
    • join操作: 切成多个部分, 分开join, 最后union
      1. 判断出,造成数据倾斜的一些key值 (可通过观察或者sample取样)
        • 如主号
      2. 单独拎出来上述key值的记录做join, 剩余记录再做join
        • 独立做优化, 如broadcast
      3. 结果数据union即可

3.3. 实际采的方法

  • HIVE 预处理
  • 过滤无效的数据
  • broadcast
  • 打散 –> 随机数
  • shuffle 并行度

Example:

......
visitor_leads_fans_df.repartition($"random_index")
    .join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")
    .drop("random_bucket", "random_index")
......

REF


知识共享许可协议
SWF's Hacking Dreamonephone 创作,采用 知识共享 署名-非商业性使用 4.0 国际 许可协议进行许可。
© 2011-2020. All rights reserved by onephone. Powerd by Jekyll.