提效工具记录

hive sql相关

常见错误

hive msck repair table 报错

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask

处理办法: set hive.msck.path.validation=ignore; msck repair table 库名.表名

数据串列

--临时表使用orc格式存储
create table tmp_table stored as orc as select ...
--可替换特殊字符后使用
regexp_replace(room_name,'\\n|\\t|\\r', '') as room_name

hadoop Unexpected end of input stream 错误

#错误信息:

2021-05-31 10:57:57,333 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: java.io.IOException: java.io.EOFException: Unexpected end of input stream
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:231)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.next(HadoopShimsSecure.java:141)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:199)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:185)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: java.io.EOFException: Unexpected end of input stream
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:355)
	at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:106)
	at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:42)
	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:228)
	... 11 more
Caused by: java.io.EOFException: Unexpected end of input stream
	at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
	at java.io.InputStream.read(InputStream.java:101)
	at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:350)
	... 15 more
#原因
对应mapper中读取文件的路径

2021-05-31 10:57:56,103 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: Paths:/user/qhstats/ods/ods_hotel_log_breeze/dt=20210528/hour=21/h_hspa_l-qtaspaphy15.h.cn2_out.product.log.2021-05-28-21.gz:0+115093InputFormatClass: org.apache.hadoop.mapred.TextInputFormat
从报错信息来看,是和io读取有关系的,即在map阶段,数据读取出错导致。

#解决方案
通过explain extended查看读取的文件,因为是text的gz文件,使用zcat进行测试,最终定位到是由于gz文件异常导致。将有问题的数据删除后job恢复。

java.lang.OutOfMemoryError: Java heap space 错误

2021-09-22 16:43:02,967 ERROR [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.orc.impl.OutStream.getNewOutputBuffer(OutStream.java:119)
	at org.apache.orc.impl.OutStream.spill(OutStream.java:172)
	at org.apache.orc.impl.OutStream.write(OutStream.java:149)
	at org.apache.orc.impl.WriterImpl$StringBaseTreeWriter.flushDictionary(WriterImpl.java:1336)
	at org.apache.orc.impl.WriterImpl$StringBaseTreeWriter.createRowIndexEntry(WriterImpl.java:1390)
	at org.apache.orc.impl.WriterImpl$TreeWriter.createRowIndexEntry(WriterImpl.java:844)
	at org.apache.orc.impl.WriterImpl.createRowIndexEntry(WriterImpl.java:2480)
	at org.apache.orc.impl.WriterImpl.flushStripe(WriterImpl.java:2487)
	at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:2787)
	at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:313)
	at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:120)
	at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:190)
	at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1036)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:683)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:189)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

#说明
该堆栈说明当前maptask已经处理完了从hdfs数据读取记录的阶段,在将读取的数据转化为orc file(目标表客户使用了orc格式)输出时内存不足。
综上所述,当前的问题就在于失败的maptask在读取原始文本数据时没有正确的处理行数,导致误把剩下的20多万条记录当做了一行数据量很大的记录进行处理,最终在转化为orc格式存储时内存不足。

#根本原因
这种问题通常是原始数据文件存在行分隔符处理不正确或者是文本中有某些异常符号,导致文本数据读取异常。由于存在问题的文件已经删除,且最新生成的文件一直都成功导入,不存在上述问题,所以具体当时原始数据文件到底有什么问题无法分析。

#解决方案
根据失败的maptask定位到具体有问题的数据文件,将数据文件挪走,重新触发导入任务即可成功。由于Hive入库时,是按行插入数据,挪走该部分数据不会导致入库数据出现数据不一致问题,只需要重新处理该部分数据再进行导入即可。针对保留的存在问题的文件分析该文本数据到底存在什么问题,导致根据客户指定的行分隔符无法正常读取正确行数。
根据失败的maptask定位具体有问题的文件的方式如下:
1.到jobhistory页面进入失败任务的失败的 map task查看日志,或从hdfs上“/tmp/logs/bigdata/logs/application_任务号”路径获取日志。
2.在失败的map task日志中搜索“Processing split”,可以查到如下信息:
2016-08-09 16:33:19,445 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: Paths:/hzsrc/external/khzl_xj_yd/3aca1f56-d7d0-37c4-b37b-f2a3340182a7/KHZL_XJ_YD@2016070404@append@337d27a0-ce7e-3732-944f-fc2239122b0e.6491959.4880125171.dat:1073741824+134217728,/hzsrc/external/khzl_xj_yd/3aca1f56-d7d0-37c4-b37b-f2a3340182a7/KHZL_XJ_YD@2016070404@append@33f6e047-e5e3-32c0-8407-959282b435b6.7479369.5504391741.dat:402653184+134217728InputFormatClass: org.apache.hadoop.mapred.TextInputFormat
上述日志里就是有问题的源数据文件。

#参考
https://support.huawei.com/enterprise/zh/knowledge/EKB1000243069

倾斜key

2021-04-07 16:37:00,614 INFO [main] org.apache.hadoop.hive.ql.exec.CommonJoinOperator: table 0 has 16000 rows for join key [eczb9510]

hive.auto.convert.join=true 引起 mapjoin 中 MapredLocalTask 内存溢出

#报错信息
ERROR : FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
ERROR : ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask

Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask (state=08S01,code=1)

#原因
hive.auto.convert.join=true 任务自动优化为mapjoin,在执行过程中MapredLocalTask内存溢出

#解决方案
set hive.auto.convert.join=false

org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded

#报错信息
org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded 
 at org.apache.hadoop.io.Text.setCapacity(Text.java:268) 
     at org.apache.hadoop.io.Text.set(Text.java:224) 
     at org.apache.hadoop.io.Text.set(Text.java:214)

#原因
一般会有下面几种情况: 
 1.map/reduce的内存不足 
 2.client 的heap不足  client hive heap(一般是发生mapjoin时导致的) 
 3.opts(mapreduce.map/reduce.java.opts不足,但是堆内存必须小于memory的大小)

#解决方案
调大相应的参数即可
set mapreduce.map.memory.mb=9216;
set mapreduce.map.java.opts=-Xmx3276m;
set mapreduce.reduce.memory.mb=9216;
set mapreduce.reduce.java.opts=-Xmx3276m;

正则匹配

提取字符串中指定数据

coalesce(regexp_extract(params, ‘uid=(.?)&’, 1), ‘’) as uid 最小匹配(.?) 最大匹配(.*)

反向引用

捕获会返回一个捕获组,这个分组是保存在内存中的,不仅可以在正则表达式外部通过程序进行引用,也可以在正则表达式内部进行引用,这种引用方式就是反向引用。 根据捕获组的命名规则,反向引用可分为: 1.数字编号组反向引用:\k或\number 2.命名编号组反向引用:\k或\‘name’ 捕获组是匹配子表达式的内容按序号或者命名保存起来以便使用,主要是用来查找一些重复的内容或者替换指定字符。 select ‘abac’ regexp ‘(\w)(\w)\1\2’ select regexp_replace(‘ababcccdcd’,’(\w)(\w)\1\2’,’’) select regexp_extract(‘ababcccdcd’,’(\w)(\w)\1\2’,0)

匹配所有叠词

如:haha,123123,12341234 select ‘123123’ regexp ‘(\w{1,})\1’;

参数说明

调优参数

set hive.map.aggr=true;
# 是否开启mapper端聚合
hive.map.aggr
# 是否开启,如果数据倾斜,是否优化group by为两个MR job
#该配置会触发hive增加额外的mr过程,随机化key后进行聚合操作得到中间结果,再对中间结果执行最终的聚合操作。
#count(distinct)操作比较特殊,无法进行中间的聚合操作,因此该参数对有count(distinct)操作的sql不适用。

set hive.groupby.skewindata=true;

#MAPJOIN
#  select /*+ MAPJOIN(supplier, room, ab) */
set hive.auto.convert.join=true;
set hive.optimize.skewjoin=true;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=100000;

# Error: Java heap space | Error: GC overhead limit exceeded | FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
# mapreduce.map.memory.mb 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死
# mapreduce.map.java.opts 设置 jvm heap size
set mapreduce.map.memory.mb=8192;
set mapreduce.map.java.opts=-Xmx6553M;
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx6553M;

# Error: Java heap space Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143
set mapred.child.java.opts=-Xmx512M;
# io.sort.mb 的作用 排序所使用的内存数量。  默认值:100M,需要与mapred.child.java.opts相配 默认:-Xmx200m。  不能超过mapred.child.java.opt设置,否则会OOM。
set io.sort.mb=50;

set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.exec.compress.intermediate=true;

set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true
set mapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set mapreduce.input.fileinputformat.split.maxsize=512000000;
set mapreduce.input.fileinputformat.split.minsize.per.node=64000000;
set mapreduce.input.fileinputformat.split.minsize.per.rack=64000000;

set mapreduce.reduce.input.buffer.percent=1;

set mapred.job.priority=HIGH;
#增加reduce数
set mapred.reduce.tasks = 100;
set hive.exec.reducers.bytes.per.reducer=250000000;
#增加map数
set mapred.max.split.size=52428800; -- 决定每个map处理的最大的文件大小,单位为B
set mapred.min.split.size.per.node=1; -- 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=1; -- 机架中可以处理的最小的文件大小

set mapred.reduce.tasks = 10; -- 设置reduce的数量


#禁用mapjoin
set hive.auto.convert.join=false;  --禁用自动MapJoin
set hive.ignore.mapjoin.hint=false;  --禁用自动检测


#error in shuffle in fetcher#3
set mapreduce.reduce.shuffle.memory.limit.percent=0.15;

sql可以这样写

数字前补0,eg:0->00

-- 0->00
select substr(cast(power(10,2)+0 as string),2,2)
-- 00

字符串数组array转array

-- ["abc","ef"]
select sort_array(split(regexp_replace('["abc","ef"]','^\\[|\\]$|\\"',''),','))
-- ["abc","ef"]

字符串map转map

-- {user_ip=58.222.204.210,qn1=null,qn2=,inner_channel=C2075}
select str_to_map(regexp_replace('{user_ip=58.222.204.210,qn1=null,qn2=,inner_channel=C2075}', '^\\{|\\}$', ''),  ',' ,'=')
-- {"qn1":"null","user_ip":"58.222.204.210","qn2":"","inner_channel":"C2075"}

map类型展开

| 字段名 | 类型 | 说明 | | ——– | —–: | :—-: | |init_subtract_detail|map<string,decimal(18,4)>|直接金额优惠明细(下单初始值) key定义为优惠主题名称,具体主题名字详解:返现,直减主题对应表|

LATERAL VIEW explode(init_subtract_detail) t as myMapKey,myMapValue

非固定json节点展开

-- {
--     "2023702060072202200058302":{
--         "couponId":"4320943410",
--         "couponNo":"2023702060072202200058302",
--         "refundAmount":10,
--         "singleAttach":false
--     },
--     "2023702060072202200058301":{
--         "couponId":"4320943413",
--         "couponNo":"2023702060072202200058301",
--         "refundAmount":10,
--         "singleAttach":false
--     }
-- }

select 
    regexp_extract(coupon_detail_new,':(.*?)$',1)
    ,get_json_object(regexp_extract(coupon_detail_new,':(.*?)$',1), '$.couponId')
    ,get_json_object(regexp_extract(coupon_detail_new,':(.*?)$',1), '$.couponNo')
    ,get_json_object(regexp_extract(coupon_detail_new,':(.*?)$',1), '$.refundAmount')
    ,get_json_object(regexp_extract(coupon_detail_new,':(.*?)$',1), '$.singleAttach')
from (select '{"2023702060072202200058302":{"couponId":"4320943410","couponNo":"2023702060072202200058302","refundAmount":10.0,"singleAttach":false},"2023702060072202200058301":{"couponId":"4320943413","couponNo":"2023702060072202200058301","refundAmount":10.0,"singleAttach":false}}' as coupon_detail) t
lateral view explode(split(substr(replace(coupon_detail,'},','}###'),2,length(replace(coupon_detail,'},','}###'))-2),'###')) t as coupon_detail_new

-- {"couponId":"4320943410","couponNo":"2023702060072202200058302","refundAmount":10.0,"singleAttach":false}       4320943410      2023702060072202200058302       10.0    false
-- {"couponId":"4320943413","couponNo":"2023702060072202200058301","refundAmount":10.0,"singleAttach":false}       4320943413      2023702060072202200058301       10.0    false

数据校验

function do_check_and_sync_davinci(){
    ret=`hive_211 -e "
    select num>0,'校验数据正常'
    from (
        select count(*) as num
        from ticket.rpt_shop_monitor_order
    ) tmp;
    "`
    if [ ! -n "$ret" ]; then
        echo "IS NULL"
    else
        echo "NOT NULL"
        sync_davinci
    fi
}

生成每隔2分钟一个的时间戳

--a.生成0到1440隔2递增的数组
--b.炸开数组
--c.当天时间戳+inc*60得到每隔2分钟的时间戳
--d.将时间戳转成时间
select
    case when (inc*60+unix_timestamp('$FORMAT_DATE','yyyy-MM-dd'))=unix_timestamp('$FORMAT_DATE_ADD_1','yyyy-MM-dd')
         then concat('$FORMAT_DATE',' 23:59:59') else from_unixtime(inc*60+unix_timestamp('$FORMAT_DATE','yyyy-MM-dd') ,'yyyy-MM-dd HH:mm:ss')
    end as inc_time,
    ceiling(inc/2) as flag
from (
    select explode(qhstats_seq(0,1440,2)) inc
) t

join数据倾斜

--a.明细表的key加上100以内的随机前缀,将数据打散 (ceiling函数,向上取整) concat_ws('_', cast(ceiling(rand()*99) as string), key)
--b.将表b扩容100倍,给key加上100以内的随机前缀 lateral view explode(qhstats_seq(1,100,1))
--c.join使用添加前缀的key关联
  select a.*
  from (
    select
      dt,
      wrapper_id,
      cluster_status,
      check_status,
      partner_hotel_id,
      case when wrapper_id in ('hca9008oc4l') then 'C2Q'
           when wrapper_id in ('hca9008lp9v','hca908lp9ag','hca908lp9ah','hca908lp9ai','hca908lp9aj') then 'ABE'
           else '普通代理' end as supplier_dom
    from ihotel_default.ods_qta_hotel_cluster_result
    where dt between '${zdt.addDay(-1).format("yyyyMMdd")}' and '${zdt.addDay(-1).format("yyyyMMdd")}'
      and wrapper_id like 'hca%'
      and (city != '未知' or (city ='未知' and hotel_point = ''))
  ) a
  left join (
    select
      concat_ws('_',cast(id as string),supplier_code) as supplier_code
    from (
      select
        supplier_code
      from default.mdw_order_v3_international
      where dt = '${zdt.addDay(-1).format("yyyyMMdd")}'
        and order_date between '${zdt.addDay(-30).format("yyyy-MM-dd")}' and '${zdt.addDay(-1).format("yyyy-MM-dd")}'
        and is_valid = '1'
        and (province_name in ('香港','澳门','台湾') or country_name!='中国')
        and supplier_code is not null
        and trim(supplier_code) not in ('','null','NULL')
      group by 1
    ) t lateral view explode(qhstats_seq(1,100,1)) a as id
  ) supplier on concat_ws('_', cast(ceiling(rand()*99) as string), a.wrapper_id) = supplier.supplier_code

mapjoin

  set hive.auto.convert.join=true;
  set hive.optimize.skewjoin=true;

  select
  /*+ MAPJOIN(supplier, room, ab) */

关联条件随机打散避免数据倾斜

on case when (a.hotel_seq is null or trim(a.hotel_seq) in ('NULL', 'null', '')) then concat(rand(), 'order') else a.hotel_seq end = c.hotel_seq

collect_list保持顺序

regexp_replace(concat_ws(',', sort_array(collect_list(concat_ws(':', cast(rank as string), sight_id)))), '\\\\d\:', '') as sight_ids

一个日期对应T+15个日期

--统计每天T+15天的交叉数据 日历数据取连续日期的,每个日期生成一个T+15的连续日期数组并explode,得到一个日期对应T+15行日期
select
  calendar_date,
  from_unixtime(unix_timestamp(calendar_date,'yyyy-MM-dd'),'yyyyMMdd') as calendar_dt,
  from_unixtime(unix_timestamp(date_1,'yyyy-MM-dd'),'yyyyMMdd') as dt_1
from dim_date_info
lateral view explode(qhstats_date_array(calendar_date,date_add(calendar_date,15),1)) a as date_1
where calendar_date between '${zdt.addDay(-20).format("yyyy-MM-dd")}' and '${zdt.addDay(-1).format("yyyy-MM-dd")}'

自然周累计

--STANDARD_MONDAY=2016-01-03 #计算day_of_week 任意周末对应的日期
    sum(quantity) over(partition by product_partner,is_mainland,is_ark,year,week_of_year order by day_of_week asc rows between unbounded preceding and current row) as quantity_week,
    sum(quantity) over(partition by product_partner,is_mainland,is_ark,year,month_of_year order by day_of_month asc rows between unbounded preceding and current row) as quantity_month,
    sum(profit) over(partition by product_partner,is_mainland,is_ark,year,week_of_year order by day_of_week asc rows between unbounded preceding and current row) as profit_week,
    sum(profit) over(partition by product_partner,is_mainland,is_ark,year,month_of_year order by day_of_month asc rows between unbounded preceding and current row) as profit_month,
    sum(gmv) over(partition by product_partner,is_mainland,is_ark,year,week_of_year order by day_of_week asc rows between unbounded preceding and current row) as gmv_week,
    sum(gmv) over(partition by product_partner,is_mainland,is_ark,year,month_of_year order by day_of_month asc rows between unbounded preceding and current row) as gmv_month
from (
    select
        all.report_date,
        all.product_partner,
        all.is_mainland,
        all.is_ark,
        coalesce(quantity,0) as quantity,
        coalesce(profit,0) as profit,
        coalesce(profit_rate,0) as profit_rate,
        coalesce(gmv,0) as gmv,
        coalesce(quantity_qc,0) as quantity_qc,
        coalesce(profit_qc,0) as profit_qc,
        coalesce(gmv_qc,0) as gmv_qc,
        datediff(all.report_date,trunc(all.report_date,'MM'))+1 as day_of_month,
        if(pmod(datediff(all.report_date,'$STANDARD_MONDAY'),7)=0,7,pmod(datediff(all.report_date,'$STANDARD_MONDAY'),7)) as day_of_week,
        weekofyear(all.report_date) as week_of_year,
        month(all.report_date) as month_of_year,
        year(all.report_date) as year

分组后按顺序连接一列

-- 在 collect_set 之前的子查询中,按 id 分布并按 id 排序。 数据集将在聚合之前按 id 分布在减速器之间并按等级排序。
with demo_dataset as ( --Use your table instead of this CTE
  select stack(4,
  '001' , 'pass', 2,
  '002' , 'fail', 3,
  '001' , 'fail', 1,
  '002' , 'pass', 1
  ) as (id,result,rank)
)

select id, concat_ws('-',collect_set(result))
from
(
  select t.* 
    from demo_dataset t
  distribute by id   --Distribute by grouping column
  sort by id, rank   --Sort in required order
) s
group by id

-- 结果
id  results
001 fail-pass
002 pass-fail
-- 更改 sort by id, rank desc 将按id分组rank降序连接

大数据量下如何快速计算uv

计算一下内容召回日志中一天的请求量,sql如下:
select dt,count(distinct request_id) cnt from c_desert_feed.dw_feedstream_recall_info where dt >= '20220601' and dt <= '20220605' group by dt;
5天的数据结果运行一个小时都没结束

通过执行计划猜测是由于每天的量太大导致一个reduce处理一天的数据就很慢,所以将sql改成先去重然后再求count,理论上会快很多
select
    dt,
    count(1) cnt
from (
    select dt,request_id cnt from c_desert_feed.dw_feedstream_recall_info where dt >= '20220601' and dt <= '20220605' group by dt, request_id
) as t
group by dt;
两个执行计划几乎相同,所以运行还是巨慢,所以只要让不让hive优化分两个job运行就好了
## 解决方法
-- 耗时 380.366 seconds
select
    dt, sum(cnt) as cnt
from (
    select
        dt, round(rand()*100, 0) as rn, count(1) cnt
    from (
        select dt, request_id from c_desert_feed.dw_feedstream_recall_info where dt >= '20220601' and dt <= '20220605' group by dt, request_id
    ) t
    group by 1, 2
) as t
group by dt;

窗口函数求avg报错:BigInteger divide by zero

通过窗口函数求avg的时候,偶尔会报除数不能为零的错

Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":0,"reducesinkkey1":"2022-01-05"},"value":{"_col1":null}}
        at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:257)
        at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":0,"reducesinkkey1":"2022-01-05"},"value":{"_col1":null}}
        at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:245)
        ... 7 more
Caused by: java.lang.ArithmeticException: BigInteger divide by zero
        at java.math.MutableBigInteger.divide(MutableBigInteger.java:1440)
        at java.math.BigDecimal.divideAndRound(BigDecimal.java:4248)
        at java.math.BigDecimal.divide(BigDecimal.java:5191)
        at java.math.BigDecimal.divide(BigDecimal.java:1561)
        at java.math.BigDecimal.divide(BigDecimal.java:1591)
        at org.apache.hadoop.hive.common.type.HiveDecimal.divide(HiveDecimal.java:234)
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage$GenericUDAFAverageEvaluatorDecimal$1.getNextResult(GenericUDAFAverage.java:319)
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage$GenericUDAFAverageEvaluatorDecimal$1.getNextResult(GenericUDAFAverage.java:301)
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator$SumAvgEnhancer.iterate(GenericUDAFStreamingEvaluator.java:190)
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.aggregate(GenericUDAFEvaluator.java:188)
        at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.processRow(WindowingTableFunction.java:412)
        at org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.processRow(PTFOperator.java:325)
        at org.apache.hadoop.hive.ql.exec.PTFOperator.process(PTFOperator.java:138)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:879)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
        at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:236)

double没有报错,而且结果是【NaN】是因为 double 类型除数为0是不会报错的,会返回一个叫【NaN】的值
decimal报错

解决方法
1.将类型由 decimal 改为 double
2.单独算 sum 和 count 然后自己算avg

CTE (with … as …) 语句物化导致数据丢失

对于多次被使用的sql片段一般会通过 with … as 语句将其定义成一个变量,即方便使用又美化sql;不过唯一的问题是对于with … as语句定义的sql片段hive默认是不会进行物化的,简单的逻辑还好,如果sql片段很复杂会导致资源占用增加并且运行效率低下;

如果想让hive对with … as定义的语法片段进行物化可以通过如下参数设置:

-- 意思是with as模块的数据如果被引用超过两次就会物化到一个临时表里面,也就是说此时with as的语句只会运行一次
-- 默认值是-1,会禁用此优化
set hive.optimize.cte.materialize.threshold = 2

在实践中发现设置物化参数后偶尔会出现数据丢失的问题,导致目标表中不会写入数据
主要原因大致是 with … as 语句进行物化的时候job之间的依赖关系没处理好,导致有的任务提前跑了,最终导致部分数据丢失

  • 解决方法 由于 with … as 语句的物化底层也是通过创建临时表来完成的,既然它自己物化不好那就通过创建临时表自己将sql片段进行物化
    create temporary table t stored as orc as
    ...
    

shell相关

shell方法并行执行(后台执行并查询任务执行状态)

function run_basic() {
    ( sleep 3; calculate_q_order ) &
    ( sleep 3; calculate_q_checkout ) &

    local ret_code=0
    for pid in $(jobs -p)
    do
        wait "${pid}"
        if (($?!=0)); then
            ((ret_code++))
        fi
    done
    return $ret_code
}
--1. command& 让进程在后台运行
--2. jobs 查看后台运行的进程
--3. fg %n 让后台运行的进程n到前台来
--4. bg %n 让进程n到后台去;   
--PS:"n"为jobs查看到的进程编号.

alias别名

alias del_davinci='curl -X POST http://davincidata.corp.qunar.com/data/kudu/drop_pg_table -d "business=hotel&tableName=$1&password=******"'
alias product_db='psql "host ={host} port =5432 user ={user} password ={password} dbname ={dbname} "'
alias ftp_enter='lftp {username}:{password}@{host}:{port}'
# alias带参数
alias cd='test() { cd $1; ls;};test'
# alias别名永久有效
## 把所有的别名设置方案加入到($HOME)目录下的 .alias 文件中, 然后在 .bashrc 文件中增加这样一段代码
# Aliases
if [ -f ~/.alias ]; then
  . ~/.alias
fi

shell常用语法

# 函数中定义本地数组变量
function func {
    local a=()
}

# 字符串变量非空
if [ -n "${msg}" ];then
    echo "not null"
fi

# 字符串替换
sed 

# 字符串截取
awk -F"#@#@#" '{print $2}'
awk -F ':' '{print $NF}' 最后一列

# 去重行
echo "${msg}" | sort -u

循环刷数

连续日期

#!/bin/bash

START_DATE='20200813'
END_DATE='20201011'
EXEC_DATE=${START_DATE}

while [ ${EXEC_DATE} -le ${END_DATE} ]
do
echo ${EXEC_DATE}
job_test rpt_qc_order_price_comparison/new_druid ${EXEC_DATE}
EXEC_DATE=`date -d "1 day ${EXEC_DATE}" +"%Y%m%d"`
done

非连续日期

#!/bin/bash

DATE_ARRARY=(
'20180727'
'20181016'
'20181018'
)

for EXEC_DATE in ${DATE_ARRARY[@]}
do
echo ${EXEC_DATE}
job_test mdw_order_v3/all_extend ${EXEC_DATE}
done

小时执行

#!/bin/bash

for HOUR in {00..23} ; do
  echo 'run ...'
done

自动更新合并master分支并push

#!/bin/bash
branch=$1

if [ -z $branch ];then #$branch is empty
  cd /home/q/home/junjieiot.wang/hotel-analysis-jobs
  git branch -a | grep junjieiot.wang
else
  echo "git push $branch"
  cd /home/q/home/junjieiot.wang/hotel-analysis-jobs
  git checkout master
  git pull
  git checkout $branch
  git pull
  git merge master
  git push origin $branch
fi

screen后台执行

# 退出screen
Ctrl + a + d
# 查看screen
Screen –ls
# 重连screen
Screen –r 192218
# Screen关闭
Exit

数据库/ftp连接

MySQL

mysql -u{username} -h{hostname} -P{port} -D{dbname} -p{password}

postgresql

psql "host ={host} port =5432 user ={user} password ={password} dbname ={dbname} "

ftp

lftp {username}:{password}@{host}:{port}

赞赏支持

取消

打赏,支持一下!

扫码支持
扫码支持

打开支付宝/微信扫一扫,即可扫码打赏哦!