spark 数据同步

Last updated on January 17, 2025 am

🧙 Questions

购买了一台配置不是很好的服务器,已经安装好yarn,hive等一系列服务
4核32GB内存都是够的,但是spark作业每次作业都无法正常跑起来
且cpu都处理爆炸的状态

☄️ Ideas

作业配置参数

local模式重要参数

spark.memory.fraction: 0.7
spark.sql.shuffle.partitions: 300
spark.shuffle.io.maxRetries: 10
spark.shuffle.io.retryWait: 10s
  • hive.metastore.uris
    hive数据库的连接信息
  • spark.executor.instances
    指定spark中executor的个数
  • spark.executor.memory
    单个executor中的可用内存,默认为1g,注意该内存包含 预留内存spark.executor.memoryOverhead(最少384MB),计算内存(默认40%),数据存储内存(默认60%)
  • spark.executor.cores
    指定单个executor中的core数量
  • spark.memory.fraction
    指定单个executor中的计算内存和存储内存的百分比,默认值0.6,60%的存储内存和40%的计算内存
  • spark.memory.storageFraction
    单独指定存储内存的百分比,默认0.5,官网推荐不修改。50%内存用于存储数据,对driver也生效,值必须小于1,[0,1)
  • spark.executor.memoryOverhead
    指定executor预留内存,最低384MB,spark.executor.memory*spark.executor.memoryOverheadFactor
  • spark.executor.memoryOverheadFactor
    预留executor内存百分比,默认0.1,必须大于0.0
  • spark.executor.heartbeatInterval
    默认10s,executor发送心跳的间隔时间,如果没有发送成功,则driver会将它的状态改为dead,值一定要小于spark.network.timeout
  • spark.network.timeout
    默认120s,spark中所有的rpc调用超时时间
  • spark.driver.memory
    默认值1g,配置driver的内存
  • spark.driver.cores
    默认值1,配置driver的核
  • spark.driver.memoryOverhead
    driver预留内存,最低384MB
  • spark.driver.memoryOverheadFactor
    driver预留内存,百分比
  • spark.sql.shuffle.partitions
    默认200,拆分sparksql中shuffle部分的task数量,对sparksql生成
  • spark.default.parallelism
    spark中并行度,对通用的spark作业生效
  • spark.dynamicAllocation.enabled
    默认false,允许spark动态分配资源

以spark on yarn运行数据同步

mysql走内网,可用资源8核16GB
6000万数据量

CREATE TEMPORARY VIEW lineorder_view
USING org.apache.spark.sql.jdbc
OPTIONS (
    driver 'com.mysql.cj.jdbc.Driver',
    url 'jdbc:mysql://172.16.215.84:30306/isxcode_db',
    user 'root',
    password 'Zhiqingyun@isxcode',
    dbtable 'lineorder'
);

insert into isxcode_db.lineorder select * from lineorder_view;
配置1
{
  "hive.metastore.uris": "thrift://isxcode:9083",
  "spark.memory.fraction": 0.6,
  "spark.executor.instances": 2,
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.executor.cores": 1,
  "spark.executor.memory":"1g",
  "spark.executor.memoryOverheadFactor": 0.1,
  "spark.executor.heartbeatInterval":"10s",
  "spark.driver.memory": "1g",
  "spark.driver.cores": 1, 
  "spark.network.timeout":"120s",
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.dynamicAllocation.enabled": true,
}

结论:

spark.executor.instances 会影响向yarn申请更多的虚拟内核
由于是直接运行sparksql,executor其实只使用了一个
exector的存储内存为366.3MB (1024 - 384) * 60% 约等于384MB 相差20MB
driver的存储内存为366.3MB
配置2
{
  "hive.metastore.uris": "thrift://isxcode:9083",
  "spark.memory.fraction": 0.6,
  "spark.executor.instances": 2,
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.executor.cores": 1,
  "spark.executor.memory":"5g",
  "spark.executor.memoryOverheadFactor": 0.1,
  "spark.executor.heartbeatInterval":"10s",
  "spark.driver.memory": "1g",
  "spark.driver.cores": 1, 
  "spark.network.timeout":"120s",
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.dynamicAllocation.enabled": true,
}

结论:

exector的存储内存为2.5GB  (5 - 5 * 10%) * 60% 约等于2.7GB 相差200MB
driver的存储内存为366.3MB
配置3
{
  "hive.metastore.uris": "thrift://isxcode:9083",
  "spark.memory.fraction": 0.6,
  "spark.executor.instances": 1,
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.executor.cores": 1,
  "spark.executor.memory":"10g",
  "spark.executor.memoryOverheadFactor": 0.1,
  "spark.executor.heartbeatInterval":"10s",
  "spark.driver.memory": "1g",
  "spark.driver.cores": 1, 
  "spark.network.timeout":"120s",
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.dynamicAllocation.enabled": true,
}

结论:

exector的存储内存为5.2GB  (10 - 10 * 10%) * 60% 约等于5.4GB 相差200MB
driver的存储内存为366.3MB
配置4
{
  "hive.metastore.uris": "thrift://isxcode:9083",
  "spark.memory.fraction": 0.8,
  "spark.executor.instances": 1,
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.executor.cores": 1,
  "spark.executor.memory":"12g",
  "spark.executor.memoryOverheadFactor": 0.1,
  "spark.executor.heartbeatInterval":"10s",
  "spark.driver.memory": "1g",
  "spark.driver.cores": 1, 
  "spark.network.timeout":"120s",
  "spark.default.parallelism": 10,
  "spark.sql.shuffle.partitions": 200,
  "spark.dynamicAllocation.enabled": true,
}

结论:

exector的存储内存为5.2GB  (12 - 12 * 10%) * 60% 约等于5.4GB 相差200MB
driver的存储内存为366.3MB
数据同步

6000万条数据,才分成300个分区,设置区间1~60000000,每个分区20万数据

CREATE TEMPORARY VIEW lineorder_view
USING org.apache.spark.sql.jdbc
OPTIONS (
    driver 'com.mysql.cj.jdbc.Driver',
    url 'jdbc:mysql://test-dev:30306/default_db',
    user 'root',
    password 'ispong123',
    dbtable 'lineorder',
    numPartitions  300,
    partitionColumn 'lo_orderkey',
    lowerBound 1,
    upperBound 60000000
);

insert into lineorder select * from lineorder_view;

yarn 16核心20G
spark的executor-15个,driver1个
executor平分1GB内存
driver-1GB内存
stage肯定只有1个
executor个数
task执行器 executor个数*核心数

{
  "hive.metastore.uris": "thrift://isxcode:9083",
  "spark.executor.instances": 15,
  "spark.sql.shuffle.partitions": 400,
  "spark.executor.cores": 1,
  "spark.executor.memory":"1g",
  "spark.driver.cores": 1, 
  "spark.driver.memory": "1g"
}

有15个task同时执行,每个task可用1g数据,每个task处理20万数据,每个task执行器处理20个任务
看看yarn的资源申请
和正式的服务器存储
和正式的服务器内存/cpu
看看是不是一个报错全部报错

正式测试

16核心20GB
1GB/1核心
20GB/16核心


{
“hive.metastore.uris”: “thrift://test-dev:9083”,
“spark.executor.instances”: 15,
“spark.executor.cores”: 1,
“spark.executor.memory”:”2g”,
“spark.driver.cores”: 1,
“spark.driver.memory”: “1g”
}
yarn分配7个核心20GB

// 只启动了7个yarn的容器,导致只有6个exeutor,1.2分钟处理2000条数据


{
“hive.metastore.uris”: “thrift://test-dev:9083”,
“spark.executor.instances”: 15,
“spark.executor.cores”: 2,
“spark.executor.memory”:”1g”,
“spark.driver.cores”: 1,
“spark.driver.memory”: “1g”
}
yarn分配10核心20GB

// 考虑到分区,单个task,没必要使用2个G
// executor有18个 (20-1)*9 其实executor还是只有9个,只是每个执行器,可以执行2个task
// 1分钟还是2000条


{
“hive.metastore.uris”: “thrift://test-dev:9083”,
“spark.executor.instances”: 15,
“spark.executor.cores”: 5,
“spark.executor.memory”:”1g”,
“spark.driver.cores”: 1,
“spark.driver.memory”: “1g”
}
yarn分配

// 并发得再多一点,其实内存并不会消耗很多
// yarn分配10核心20GB
// 默认启动1核2g,单executor节点
// 9个执行器,45个task runner
// 单个上线不能超过5
// cpu爆了
// 有可能启动爆了


{
“hive.metastore.uris”: “thrift://test-dev:9083”,
“spark.executor.instances”: 15,
“spark.executor.cores”: 3,
“spark.executor.memory”:”1g”,
“spark.driver.cores”: 1,
“spark.driver.memory”: “1g”
}

// cpu爆了


{
“hive.metastore.uris”: “thrift://test-dev:9083”,
“spark.executor.instances”: 15,
“spark.executor.cores”: 2,
“spark.executor.memory”:”1g”,
“spark.driver.cores”: 1,
“spark.driver.memory”: “1g”
}

2分钟处理2000条数据
应该封顶了


{
  "hive.metastore.uris": "thrift://test-dev:9083",
  "spark.executor.instances": 15,
  "spark.executor.cores": 1,
  "spark.executor.memory":"1g",
  "spark.driver.cores": 1, 
  "spark.driver.memory": "1g"
}
CREATE TEMPORARY VIEW lineorder_view
USING org.apache.spark.sql.jdbc
OPTIONS (
    driver 'com.mysql.cj.jdbc.Driver',
    url 'jdbc:mysql://test-dev:30306/default_db',
    user 'root',
    password 'ispong123',
    dbtable 'lineorder',
    numPartitions  3000,
    partitionColumn 'lo_orderkey',
    lowerBound 1,
    upperBound 60000000
);

insert into lineorder select * from lineorder_view;

分区少一点
1.5分钟2万的数据 9*2万 1.5分钟


继续减少分区

CREATE TEMPORARY VIEW lineorder_view
USING org.apache.spark.sql.jdbc
OPTIONS (
    driver 'com.mysql.cj.jdbc.Driver',
    url 'jdbc:mysql://test-dev:30306/default_db',
    user 'root',
    password 'ispong123',
    dbtable 'lineorder',
    numPartitions  300,
    partitionColumn 'lo_orderkey',
    lowerBound 1,
    upperBound 60000000
);

insert into lineorder select * from lineorder_view;

1.7分钟20w*9数据 cpu明显下降


// 继续将分区

CREATE TEMPORARY VIEW lineorder_view
USING org.apache.spark.sql.jdbc
OPTIONS (
    driver 'com.mysql.cj.jdbc.Driver',
    url 'jdbc:mysql://test-dev:30306/default_db',
    user 'root',
    password 'ispong123',
    dbtable 'lineorder',
    numPartitions  30,
    partitionColumn 'lo_orderkey',
    lowerBound 1,
    upperBound 60000000
);

insert into lineorder select * from lineorder_view;

executor爆了,估计内存不足


扩一下内存

{
  "hive.metastore.uris": "thrift://test-dev:9083",
  "spark.executor.instances": 15,
  "spark.executor.cores": 1,
  "spark.executor.memory":"2g",
  "spark.driver.cores": 1, 
  "spark.driver.memory": "1g"
}

yarn分配了7个核心20GB 大容器最大的配置给多了 14GB
2分钟200万*6 10分钟同步完 cpu50% 内存吃满


yarn的单容器最大1核心2GB

// 报错
// Required executor memory (2048 MB), offHeap memory (0) MB, overhead (384 MB), and PySpark memory (0 MB) is above the max threshold (2048 MB) of this cluster! Please check the values of ‘yarn.scheduler.maximum-allocation-mb’ and/or ‘yarn.nodemanager.resource.memory-mb’.
// 如果单容器要2GB,那么yarn的单容器最大要配置 2GB + 384MB的大小才能跑


yarn的单容器最大1核心2.5GB(2560MB)
yarn启动的节点数,不会大于 20 / 2.5 = 8
也就是说只能启动8个内核

2.2分钟 同步200w * 7


将内存调小,让并发变的更大

{
  "hive.metastore.uris": "thrift://test-dev:9083",
  "spark.executor.instances": 15,
  "spark.executor.cores": 1,
  "spark.executor.memory":"1g",
  "spark.driver.cores": 1, 
  "spark.driver.memory": "1g"
}

yarn 10核心 20GB

// 失败,内存太小,存不下数据


{
  "hive.metastore.uris": "thrift://test-dev:9083",
  "spark.executor.instances": 15,
  "spark.executor.cores": 1,
  "spark.executor.memory":"1.5g",
  "spark.driver.cores": 1, 
  "spark.driver.memory": "1g"
}

// 启动失败,executor内存必须整数
// 2GB内存处理200万数据


把yarn的核心调小
总核心数直接调成8核心

{
  "hive.metastore.uris": "thrift://test-dev:9083",
  "spark.executor.instances": 15,
  "spark.executor.cores": 1,
  "spark.executor.memory":"2g",
  "spark.driver.cores": 1, 
  "spark.driver.memory": "1g"
}

7个executor , 7* 2分钟 200万 2分钟1400万
9分钟同步6000万条 cpu64% 勉强可以


ssb压力测试
shuffle 边并发

spark 数据同步
https://ispong.isxcode.com/hadoop/spark/spark 数据同步/
Author
ispong
Posted on
September 11, 2023
Licensed under