spark 数据异常排查
Last updated on January 17, 2025 am
🧙 Questions
☄️ Ideas
相关联的表数据条数记录
时间节点 11:11
- ods_finance_demdm_s_inven_initial_list_crocom_g_df 5090
- ods_finance_baan_twhina112101_df 20778206
- ods_finance_demdm_s_quantity_corrrction_g_df 23
- ods_finance_baan_ttcmcs003101_df 209
- ods_finance_baan_twhinr110101_df 118765392
- ods_finance_baan_twhina114101_df 108311964
- 入库变化值 0
- 出库变化值 0
- ispong_test02
查一下是不是有人把dev的db提交上去了 没有
A (可能会多)
compare temp --> 对比 compare total --> insert into total (一条数据)
B (可能会多)
-- A表
-- ods_finance_baan_twhina112101_df
select count(1) from ods_finance_baan_twhina112101_df 20778206 没问题 20791632
-- ods_finance_baan_twhinr110101_df
select count(1) from ods_finance_baan_twhinr110101_df 118765392 没问题 118819056
-- ods_finance_baan_ttcmcs003101_df
select count(1) from ods_finance_baan_ttcmcs003101_df 209 没问题 209
-- ods_finance_demdm_s_inven_initial_list_crocom_g_df
select count(1) from ods_finance_demdm_s_inven_initial_list_crocom_g_df 5090 没问题 5090
-- ods_finance_demdm_s_quantity_corrrction_g_df
select count(1) from ods_finance_demdm_s_quantity_corrrction_g_df 23 没问题 23
insert
overwrite ispong_test02.sa_temp_A_104(
SELECT
'ERP104' as UNIT_CODE,
st.cwar,
st.item,
'' as dsca,
st.qstk,
st.qstk as REMAINDER,
st.itid,
st.trdt
FROM
ispong_test02.ods_finance_baan_twhina112101_df st
left join ispong_test02.ods_finance_baan_twhinr110101_df st3 on st.ITID = st3.ITID
and st3.ITSE = st.ITSE
left join ispong_test02.ods_finance_baan_ttcmcs003101_df sa on st.CWAR = sa.CWAR
WHERE
st3.clot = ' '
and cast(st.OWNS as decimal(20, 4)) = 10
and cast(st.QSTK as decimal(20, 4)) <> 0
and sa.comp = 104
and st.orno <> 'W01466349'
and st.orno <> 'W01466350'
and st.orno <> 'W01466351'
and st.orno <> 'W01466352'
and st.orno <> 'W01466353'
and st.orno <> 'W01466354'
and st.orno <> 'W01466355'
and st.orno <> 'W01466356'
and st.orno <> 'W01466357'
and st.orno <> 'W01466358'
and st.orno <> 'W01466359'
and from_unixtime(unix_timestamp(st.TRDT) + 28800, 'yyyy-MM-dd') < current_date()
UNION ALL
SELECT
'ERP104' as UNIT_CODE,
st.WAREHOUSE_CODE,
st.ITEM_CODE,
'' as dsca,
st.INBOUND_AMOUNT,
st.INBOUND_AMOUNT as REMAINDER,
st.INVENTORY_TRAN_IDENT,
st.INBOUND_TIME
FROM
ispong_test02.ods_finance_demdm_s_inven_initial_list_crocom_g_df st
UNION ALL
SELECT
UNIT_CODE,
cwar,
concat(' ', item) as item,
dsca,
qstk,
qstk as REMAINDER,
itid,
trdt
from
ispong_test02.ods_finance_demdm_s_quantity_corrrction_g_df
WHERE
KOST = '3'
and UNIT_CODE = 'ERP104'
);
-- 同步前 1151425
-- 至轻云 1151939
-- 执行后 1151939
select count(1) from ispong_test02.sa_temp_A_104
-- sa_temp_B_104
-- 这两张表可能数据异常
-- B表
-- ods_finance_baan_twhina114101_df
select count(1) from ispong_test02.ods_finance_baan_twhina114101_df 108311964
-- ods_finance_baan_twhinr110101_df
select count(1) from ispong_test02.ods_finance_baan_twhinr110101_df 118765392
-- ods_finance_baan_ttcmcs003101_df
select count(1) from ispong_test02.ods_finance_baan_ttcmcs003101_df 209
-- ods_finance_demdm_s_quantity_corrrction_g_df
select count(1) from ispong_test02.ods_finance_demdm_s_quantity_corrrction_g_df 23
insert
overwrite table ispong_test02.sa_temp_B_104
SELECT
'ERP104' as UNIT_CODE,
st.cwar,
st.item,
'' as dsca,
st.qstk,
st.qstk as REMAINDER,
st.itid,
st.CTDT
FROM
ispong_test02.ods_finance_baan_twhina114101_df st
left JOIN ispong_test02.ods_finance_baan_twhinr110101_df st3 on st.ITID = st3.ITID
and st.ITSE = st3.ITSE
left join ispong_test02.ods_finance_baan_ttcmcs003101_df sa on st.CWAR = sa.CWAR
WHERE
st3.clot = ' '
and cast(st.OWNS as decimal(20, 4)) = 10
and cast(st.QSTK as decimal(20, 4)) <> 0
and sa.comp = 104
and from_unixtime(unix_timestamp(st.CTDT) + 28800, 'yyyy-MM-dd') < current_date()
UNION ALL
SELECT
UNIT_CODE,
cwar,
concat(' ', item) as item,
dsca,
qstk,
qstk as REMAINDER,
itid,
trdt
from
ispong_test02.ods_finance_demdm_s_quantity_corrrction_g_df
WHERE
KOST = '5'
and UNIT_CODE = 'ERP104';
-- 至轻云 7190849
-- ispong 7190849
select count(1) from ispong_test02.sa_temp_B_104
-- sa_compare_temp_104 临时表数据数据应该不对
insert
overwrite table ispong_test02.sa_compare_temp_104
SELECT
a.cwar,
a.item
FROM
(
select
cwar,
item,
cast(sum(qstk) as decimal(20,4)) as qstk
from
ispong_test02.sa_temp_A_104
group by
cwar,
item
) a
left join (
select
cwar,
item,
cast(sum(qstk) as decimal(20,4)) as qstk
from
ispong_test02.sa_temp_B_104
group by
cwar,
item
) b on a.cwar = b.cwar
and a.item = b.item
WHERE
a.qstk <> ifnull(b.qstk, 0)
group by
a.cwar,
a.item;
-- 至轻云 45183
-- ispong 45183
select count(1) from ispong_test02.sa_compare_temp_104
-- 对比两张表 sa_compare_temp_104 和 sa_compare_total_104
-- 找出total中没有的数据
insert
overwrite table ispong_test02.sa_compare_change_104
SELECT
*
FROM
ispong_test02.sa_compare_temp_104
EXCEPT
SELECT
*
FROM
ispong_test02.sa_compare_total_104;
-- 至轻云 45183
-- ispong 45183
select count(1) from ispong_test02.sa_compare_change_104
-- 不应该有这一条数据
select count(1) from ispong_test02.sa_compare_change_104 where item like '%1003001GH200BJ%'
-- 把没有的数据同步到 total中
insert into
ispong_test02.sa_compare_total_104
select
*
from
ispong_test02.sa_compare_change_104;
-- 至轻云 45183
-- ispong 45183
select count(1) from ispong_test02.sa_compare_total_104;
-- dwd_finance_baan_ttcibd001101_df 250907 没问题
select count(1) from ispong_test02.dwd_finance_baan_ttcibd001101_df
-- 输出到 物料出库变化
insert
overwrite table ispong_test02.sa_change_B_104
SELECT
t1.UNIT_CODE,
t1.CWAR,
t1.ITEM,
t2.DSCA,
t1.QSTK,
t1.REMAINDER,
t1.ITID,
t1.CTDT
from
(
SELECT
st.UNIT_CODE,
st.CWAR,
st.ITEM,
st.QSTK,
st.REMAINDER,
st.ITID,
st.CTDT
FROM
ispong_test02.sa_temp_B_104 st
left join ispong_test02.sa_compare_total_104 sc on st.item = sc.item
and st.cwar = sc.cwar
WHERE
sc.item is not NULL
EXCEPT
ALL
SELECT
UNIT_CODE,
CWAR,
ITEM,
QSTK,
REMAINDER,
ITID,
CTDT
FROM
ispong_test02.sa_total_B_104
) t1
left join ispong_test02.dwd_finance_baan_ttcibd001101_df t2 on t1.item = t2.item
WHERE
t2.dsca is not NULL;
-- 至轻云 4379147
-- ispong 4379147
select count(1) from ispong_test02.sa_change_B_104
-- 出库 250907 没问题
select count(1) from ispong_test02.dwd_finance_baan_ttcibd001101_df
insert
overwrite table ispong_test02.sa_change_A_104
SELECT
t1.UNIT_CODE,
t1.CWAR,
t1.ITEM,
t2.DSCA,
t1.QSTK,
t1.REMAINDER,
t1.ITID,
t1.TRDT
from
(
SELECT
st.UNIT_CODE,
st.CWAR,
st.ITEM,
st.QSTK,
st.REMAINDER,
st.ITID,
st.TRDT
FROM
ispong_test02.sa_temp_A_104 st
left join ispong_test02.sa_compare_total_104 sc on st.item = sc.item
and st.cwar = sc.cwar
WHERE
sc.item is not NULL
EXCEPT
ALL
SELECT
UNIT_CODE,
CWAR,
ITEM,
QSTK,
REMAINDER,
ITID,
TRDT
FROM
ispong_test02.sa_total_A_104
) t1
left join ispong_test02.dwd_finance_baan_ttcibd001101_df t2 on t1.item = t2.item
WHERE
t2.dsca is not NULL;
-- ispong 551723
-- 至轻云 551723
select count(1) from ispong_test02.sa_change_A_104;
取物料出库明细104
结果表: sa_temp_B_104 临时记录出库明细
来源表:
1. ods_finance_baan_twhina114101_df
2. ods_finance_baan_twhinr110101_df
3. ods_finance_baan_ttcmcs003101_df
4. ods_finance_demdm_s_quantity_corrrction_g_df
每次执行前记录数据条数
insert
overwrite table sa_temp_B_104
SELECT
'ERP104' as UNIT_CODE,
st.cwar,
st.item,
'' as dsca,
st.qstk,
st.qstk as REMAINDER,
st.itid,
st.CTDT
FROM
ods_finance_baan_twhina114101_df st
left JOIN ods_finance_baan_twhinr110101_df st3 on st.ITID = st3.ITID
and st.ITSE = st3.ITSE
left join ods_finance_baan_ttcmcs003101_df sa on st.CWAR = sa.CWAR
WHERE
st3.clot = ' '
and cast(st.OWNS as decimal(20, 4)) = 10
and cast(st.QSTK as decimal(20, 4)) <> 0
and sa.comp = 104
and from_unixtime(unix_timestamp(st.CTDT) + 28800, 'yyyy-MM-dd') < current_date()
UNION ALL
SELECT
UNIT_CODE,
cwar,
concat(' ', item) as item,
dsca,
qstk,
qstk as REMAINDER,
itid,
trdt
from
ods_finance_demdm_s_quantity_corrrction_g_df
WHERE
KOST = '5'
and UNIT_CODE = 'ERP104'
取物料入库明细104
结果表: sa_temp_A_104
来源表:
1. ods_finance_baan_twhina112101_df
2. ods_finance_baan_twhinr110101_df
3. ods_finance_baan_ttcmcs003101_df
4. ods_finance_demdm_s_inven_initial_list_crocom_g_df
5. ods_finance_demdm_s_quantity_corrrction_g_df
Insert
overwrite sa_temp_A_104(
SELECT
'ERP104' as UNIT_CODE,
st.cwar,
st.item,
'' as dsca,
st.qstk,
st.qstk as REMAINDER,
st.itid,
st.trdt
FROM
ods_finance_baan_twhina112101_df st
left join ods_finance_baan_twhinr110101_df st3 on st.ITID = st3.ITID
and st3.ITSE = st.ITSE
left join ods_finance_baan_ttcmcs003101_df sa on st.CWAR = sa.CWAR
WHERE
st3.clot = ' '
and cast(st.OWNS as decimal(20, 4)) = 10
and cast(st.QSTK as decimal(20, 4)) <> 0
and sa.comp = 104
and st.orno <> 'W01466349'
and st.orno <> 'W01466350'
and st.orno <> 'W01466351'
and st.orno <> 'W01466352'
and st.orno <> 'W01466353'
and st.orno <> 'W01466354'
and st.orno <> 'W01466355'
and st.orno <> 'W01466356'
and st.orno <> 'W01466357'
and st.orno <> 'W01466358'
and st.orno <> 'W01466359'
and from_unixtime(unix_timestamp(st.TRDT) + 28800, 'yyyy-MM-dd') < current_date()
UNION ALL
SELECT
'ERP104' as UNIT_CODE,
st.WAREHOUSE_CODE,
st.ITEM_CODE,
'' as dsca,
st.INBOUND_AMOUNT,
st.INBOUND_AMOUNT as REMAINDER,
st.INVENTORY_TRAN_IDENT,
st.INBOUND_TIME
FROM
ods_finance_demdm_s_inven_initial_list_crocom_g_df st
UNION ALL
SELECT
UNIT_CODE,
cwar,
concat(' ', item) as item,
dsca,
qstk,
qstk as REMAINDER,
itid,
trdt
from
ods_finance_demdm_s_quantity_corrrction_g_df
WHERE
KOST = '3'
and UNIT_CODE = 'ERP104'
)
物料出入库数量计算104
结果表 table sa_compare_temp_104
来源表
1. sa_temp_A_104
2. sa_temp_B_104
Insert
overwrite table sa_compare_temp_104
SELECT
a.cwar,
a.item
FROM
(
select
cwar,
item,
cast(sum(qstk) as decimal(20,4)) as qstk
from
sa_temp_A_104
group by
cwar,
item
) a
left join (
select
cwar,
item,
cast(sum(qstk) as decimal(20,4)) as qstk
from
sa_temp_B_104
group by
cwar,
item
) b on a.cwar = b.cwar
and a.item = b.item
WHERE
a.qstk <> ifnull(b.qstk, 0)
group by
a.cwar,
a.item
物料种类变化计算104
结果表 table sa_compare_change_104
来源表
1. sa_compare_temp_104
2. sa_compare_total_104
Insert
overwrite table sa_compare_change_104
SELECT
*
FROM
sa_compare_temp_104
EXCEPT
SELECT
*
FROM
sa_compare_total_104
物料汇总计算104
结果表 sa_compare_total_104
来源表
1. sa_compare_change_104
insert into
sa_compare_total_104
select
*
from
sa_compare_change_104
物料出库变化值计算104
结果表 sa_change_B_104
来源表:
1. sa_temp_B_104
2. sa_compare_total_104
3. sa_total_B_104
4. dwd_finance_baan_ttcibd001101_df
Insert
overwrite table sa_change_B_104
SELECT
t1.UNIT_CODE,
t1.CWAR,
t1.ITEM,
t2.DSCA,
t1.QSTK,
t1.REMAINDER,
t1.ITID,
t1.CTDT
from
(
SELECT
st.UNIT_CODE,
st.CWAR,
st.ITEM,
st.QSTK,
st.REMAINDER,
st.ITID,
st.CTDT
FROM
sa_temp_B_104 st
left join sa_compare_total_104 sc on st.item = sc.item
and st.cwar = sc.cwar
WHERE
sc.item is not NULL
EXCEPT
ALL
SELECT
UNIT_CODE,
CWAR,
ITEM,
QSTK,
REMAINDER,
ITID,
CTDT
FROM
sa_total_B_104
) t1
left join dwd_finance_baan_ttcibd001101_df t2 on t1.item = t2.item
WHERE
t2.dsca is not NULL
SELECT
count(1)
from
(
SELECT
st.UNIT_CODE,
st.CWAR,
st.ITEM,
st.QSTK,
st.REMAINDER,
st.ITID,
st.CTDT
FROM
sa_temp_B_104 st
left join sa_compare_total_104 sc on st.item = sc.item
and st.cwar = sc.cwar
WHERE
sc.item is not NULL
EXCEPT
ALL
SELECT
UNIT_CODE,
CWAR,
ITEM,
QSTK,
REMAINDER,
ITID,
CTDT
FROM
sa_total_B_104
) t1
left join dwd_finance_baan_ttcibd001101_df t2 on t1.item = t2.item
WHERE
t2.dsca is not NULL
物料入库变化值计算104
结果表: sa_change_A_104
来源表:
1. sa_temp_A_104
2. sa_compare_total_104
3. sa_total_A_104
4. dwd_finance_baan_ttcibd001101_df
Insert
overwrite table sa_change_A_104
SELECT
t1.UNIT_CODE,
t1.CWAR,
t1.ITEM,
t2.DSCA,
t1.QSTK,
t1.REMAINDER,
t1.ITID,
t1.TRDT
from
(
SELECT
st.UNIT_CODE,
st.CWAR,
st.ITEM,
st.QSTK,
st.REMAINDER,
st.ITIj
st.TRDT
FROM
sa_temp_A_104 st
left join sa_compare_total_104 sc on st.item = sc.item
and st.cwar = sc.cwar
WHERE
sc.item is not NULL
EXCEPT
ALL
SELECT
UNIT_CODE,
CWAR,
ITEM,
QSTK,
REMAINDER,
ITID,
TRDT
FROM
sa_total_A_104
) t1
left join dwd_finance_baan_ttcibd001101_df t2 on t1.item = t2.item
WHERE
t2.dsca is not NULL
🔗 Links
spark 数据异常排查
https://ispong.isxcode.com/hadoop/spark/spark 数据异常排查/