提效工具记录

hive sql相关

常见错误

hive msck repair table 报错

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask

处理办法: set hive.msck.path.validation=ignore; msck repair table 库名.表名

数据串列

--临时表使用orc格式存储
create table tmp_table stored as orc as select ...
--可替换特殊字符后使用
regexp_replace(room_name,'\\n|\\t|\\r', '') as room_name

hadoop Unexpected end of input stream 错误

错误信息:

2021-05-31 10:57:57,333 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: java.io.IOException: java.io.EOFException: Unexpected end of input stream
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:231)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.next(HadoopShimsSecure.java:141)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:199)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:185)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: java.io.EOFException: Unexpected end of input stream
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:355)
	at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:106)
	at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:42)
	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:228)
	... 11 more
Caused by: java.io.EOFException: Unexpected end of input stream
	at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:145)
	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
	at java.io.InputStream.read(InputStream.java:101)
	at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:350)
	... 15 more
对应mapper中读取文件的路径

2021-05-31 10:57:56,103 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: Paths:/user/qhstats/ods/ods_hotel_log_breeze/dt=20210528/hour=21/h_hspa_l-qtaspaphy15.h.cn2_out.product.log.2021-05-28-21.gz:0+115093InputFormatClass: org.apache.hadoop.mapred.TextInputFormat
从报错信息来看,是和io读取有关系的,即在map阶段,数据读取出错导致。

通过explain extended查看读取的文件,因为是text的gz文件,使用zcat进行测试,最终定位到是由于gz文件异常导致。将有问题的数据删除后job恢复。

java.lang.OutOfMemoryError: Java heap space 错误

2021-09-22 16:43:02,967 ERROR [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.orc.impl.OutStream.getNewOutputBuffer(OutStream.java:119)
	at org.apache.orc.impl.OutStream.spill(OutStream.java:172)
	at org.apache.orc.impl.OutStream.write(OutStream.java:149)
	at org.apache.orc.impl.WriterImpl$StringBaseTreeWriter.flushDictionary(WriterImpl.java:1336)
	at org.apache.orc.impl.WriterImpl$StringBaseTreeWriter.createRowIndexEntry(WriterImpl.java:1390)
	at org.apache.orc.impl.WriterImpl$TreeWriter.createRowIndexEntry(WriterImpl.java:844)
	at org.apache.orc.impl.WriterImpl.createRowIndexEntry(WriterImpl.java:2480)
	at org.apache.orc.impl.WriterImpl.flushStripe(WriterImpl.java:2487)
	at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:2787)
	at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:313)
	at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:120)
	at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:190)
	at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1036)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:683)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
	at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:189)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

#说明
该堆栈说明当前maptask已经处理完了从hdfs数据读取记录的阶段,在将读取的数据转化为orc file(目标表客户使用了orc格式)输出时内存不足。
综上所述,当前的问题就在于失败的maptask在读取原始文本数据时没有正确的处理行数,导致误把剩下的20多万条记录当做了一行数据量很大的记录进行处理,最终在转化为orc格式存储时内存不足。

#根本原因
这种问题通常是原始数据文件存在行分隔符处理不正确或者是文本中有某些异常符号,导致文本数据读取异常。由于存在问题的文件已经删除,且最新生成的文件一直都成功导入,不存在上述问题,所以具体当时原始数据文件到底有什么问题无法分析。

#解决方案
根据失败的maptask定位到具体有问题的数据文件,将数据文件挪走,重新触发导入任务即可成功。由于Hive入库时,是按行插入数据,挪走该部分数据不会导致入库数据出现数据不一致问题,只需要重新处理该部分数据再进行导入即可。针对保留的存在问题的文件分析该文本数据到底存在什么问题,导致根据客户指定的行分隔符无法正常读取正确行数。
根据失败的maptask定位具体有问题的文件的方式如下:
1.到jobhistory页面进入失败任务的失败的 map task查看日志,或从hdfs上“/tmp/logs/bigdata/logs/application_任务号”路径获取日志。
2.在失败的map task日志中搜索“Processing split”,可以查到如下信息:
2016-08-09 16:33:19,445 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: Paths:/hzsrc/external/khzl_xj_yd/3aca1f56-d7d0-37c4-b37b-f2a3340182a7/KHZL_XJ_YD@2016070404@append@337d27a0-ce7e-3732-944f-fc2239122b0e.6491959.4880125171.dat:1073741824+134217728,/hzsrc/external/khzl_xj_yd/3aca1f56-d7d0-37c4-b37b-f2a3340182a7/KHZL_XJ_YD@2016070404@append@33f6e047-e5e3-32c0-8407-959282b435b6.7479369.5504391741.dat:402653184+134217728InputFormatClass: org.apache.hadoop.mapred.TextInputFormat
上述日志里就是有问题的源数据文件。

#参考
https://support.huawei.com/enterprise/zh/knowledge/EKB1000243069

倾斜key

2021-04-07 16:37:00,614 INFO [main] org.apache.hadoop.hive.ql.exec.CommonJoinOperator: table 0 has 16000 rows for join key [eczb9510]

正则匹配

提取字符串中指定数据

coalesce(regexp_extract(params, ‘uid=(.?)&’, 1), ‘’) as uid 最小匹配(.?) 最大匹配(.*)

反向引用

捕获会返回一个捕获组,这个分组是保存在内存中的,不仅可以在正则表达式外部通过程序进行引用,也可以在正则表达式内部进行引用,这种引用方式就是反向引用。 根据捕获组的命名规则,反向引用可分为: 1.数字编号组反向引用:\k或\number 2.命名编号组反向引用:\k或\‘name’ 捕获组是匹配子表达式的内容按序号或者命名保存起来以便使用,主要是用来查找一些重复的内容或者替换指定字符。 select ‘abac’ regexp ‘(\w)(\w)\1\2’ select regexp_replace(‘ababcccdcd’,’(\w)(\w)\1\2’,’’) select regexp_extract(‘ababcccdcd’,’(\w)(\w)\1\2’,0)

匹配所有叠词

如:haha,123123,12341234 select ‘123123’ regexp ‘(\w{1,})\1’;

参数说明

调优参数

set hive.map.aggr=true;
# 是否开启mapper端聚合
hive.map.aggr
# 是否开启,如果数据倾斜,是否优化group by为两个MR job
#该配置会触发hive增加额外的mr过程,随机化key后进行聚合操作得到中间结果,再对中间结果执行最终的聚合操作。
#count(distinct)操作比较特殊,无法进行中间的聚合操作,因此该参数对有count(distinct)操作的sql不适用。

set hive.groupby.skewindata=true;

#MAPJOIN
#  select /*+ MAPJOIN(supplier, room, ab) */
set hive.auto.convert.join=true;
set hive.optimize.skewjoin=true;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=100000;

# Error: Java heap space | Error: GC overhead limit exceeded | FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
# mapreduce.map.memory.mb 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死
# mapreduce.map.java.opts 设置 jvm heap size
set mapreduce.map.memory.mb=8192;
set mapreduce.map.java.opts=-Xmx6553M;
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx6553M;

# Error: Java heap space Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143
set mapred.child.java.opts=-Xmx512M;
# io.sort.mb 的作用 排序所使用的内存数量。  默认值:100M,需要与mapred.child.java.opts相配 默认:-Xmx200m。  不能超过mapred.child.java.opt设置,否则会OOM。
set io.sort.mb=50;

set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.exec.compress.intermediate=true;

set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true
set mapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set mapreduce.input.fileinputformat.split.maxsize=512000000;
set mapreduce.input.fileinputformat.split.minsize.per.node=64000000;
set mapreduce.input.fileinputformat.split.minsize.per.rack=64000000;

set mapreduce.reduce.input.buffer.percent=1;

set mapred.job.priority=HIGH;
#增加reduce数
set mapred.reduce.tasks = 100;
set hive.exec.reducers.bytes.per.reducer=250000000;
#增加map数
set mapred.max.split.size=52428800; -- 决定每个map处理的最大的文件大小,单位为B
set mapred.min.split.size.per.node=1; -- 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=1; -- 机架中可以处理的最小的文件大小

set mapred.reduce.tasks = 10; -- 设置reduce的数量


#禁用mapjoin
set hive.auto.convert.join=false;  --禁用自动MapJoin
set hive.ignore.mapjoin.hint=false;  --禁用自动检测


#error in shuffle in fetcher#3
set mapreduce.reduce.shuffle.memory.limit.percent=0.15;

sql无理要求

数字前补0,eg:0->00

--0->00
substr(cast(power(10,2)+log_hour as string),2,2)

字符串数组array转array

--["abc","ef"]
select sort_array(split(regexp_replace('["abc","ef"]','^\\[|\\]$|\\"',''),','))

字符串map转map

--{user_ip=58.222.204.210, qn1=null,qn2=, inner_channel=C2075}
select str_to_map(regexp_replace('{user_ip=58.222.204.210, qn1=null,qn2=, inner_channel=C2075}', '^\\{|\\}$', ''),  ',' ,'=')

map类型展开

| 字段名 | 类型 | 说明 | | ——– | —–: | :—-: | |init_subtract_detail|map<string,decimal(18,4)>|直接金额优惠明细(下单初始值) key定义为优惠主题名称,具体主题名字详解:返现,直减主题对应表| LATERAL VIEW explode(init_subtract_detail) t as myMapKey,myMapValue

数据校验

function do_check_and_sync_davinci(){
    ret=`hive_211 -e "
    select num>0,'校验数据正常'
    from (
        select count(*) as num
        from ticket.rpt_shop_monitor_order
    ) tmp;
    "`
    if [ ! -n "$ret" ]; then
        echo "IS NULL"
    else
        echo "NOT NULL"
        sync_davinci
    fi
}

生成每隔2分钟一个的时间戳

--a.生成0到1440隔2递增的数组
--b.炸开数组
--c.当天时间戳+inc*60得到每隔2分钟的时间戳
--d.将时间戳转成时间
select
    case when (inc*60+unix_timestamp('$FORMAT_DATE','yyyy-MM-dd'))=unix_timestamp('$FORMAT_DATE_ADD_1','yyyy-MM-dd')
         then concat('$FORMAT_DATE',' 23:59:59') else from_unixtime(inc*60+unix_timestamp('$FORMAT_DATE','yyyy-MM-dd') ,'yyyy-MM-dd HH:mm:ss')
    end as inc_time,
    ceiling(inc/2) as flag
from (
    select explode(qhstats_seq(0,1440,2)) inc
) t

join数据倾斜

--a.明细表的key加上100以内的随机前缀,将数据打散 (ceiling函数,向上取整) concat_ws('_', cast(ceiling(rand()*99) as string), key)
--b.将表b扩容100倍,给key加上100以内的随机前缀 lateral view explode(qhstats_seq(1,100,1))
--c.join使用添加前缀的key关联
  select a.*
  from (
    select
      dt,
      wrapper_id,
      cluster_status,
      check_status,
      partner_hotel_id,
      case when wrapper_id in ('hca9008oc4l') then 'C2Q'
           when wrapper_id in ('hca9008lp9v','hca908lp9ag','hca908lp9ah','hca908lp9ai','hca908lp9aj') then 'ABE'
           else '普通代理' end as supplier_dom
    from ihotel_default.ods_qta_hotel_cluster_result
    where dt between '${zdt.addDay(-1).format("yyyyMMdd")}' and '${zdt.addDay(-1).format("yyyyMMdd")}'
      and wrapper_id like 'hca%'
      and (city != '未知' or (city ='未知' and hotel_point = ''))
  ) a
  left join (
    select
      concat_ws('_',cast(id as string),supplier_code) as supplier_code
    from (
      select
        supplier_code
      from default.mdw_order_v3_international
      where dt = '${zdt.addDay(-1).format("yyyyMMdd")}'
        and order_date between '${zdt.addDay(-30).format("yyyy-MM-dd")}' and '${zdt.addDay(-1).format("yyyy-MM-dd")}'
        and is_valid = '1'
        and (province_name in ('香港','澳门','台湾') or country_name!='中国')
        and supplier_code is not null
        and trim(supplier_code) not in ('','null','NULL')
      group by 1
    ) t lateral view explode(qhstats_seq(1,100,1)) a as id
  ) supplier on concat_ws('_', cast(ceiling(rand()*99) as string), a.wrapper_id) = supplier.supplier_code

mapjoin

  set hive.auto.convert.join=true;
  set hive.optimize.skewjoin=true;

  select
  /*+ MAPJOIN(supplier, room, ab) */

关联条件随机打散避免数据倾斜

on case when (a.hotel_seq is null or trim(a.hotel_seq) in ('NULL', 'null', '')) then concat(rand(), 'order') else a.hotel_seq end = c.hotel_seq

collect_list保持顺序

regexp_replace(concat_ws(',', sort_array(collect_list(concat_ws(':', cast(rank as string), sight_id)))), '\\\\d\:', '') as sight_ids

一个日期对应T+15个日期

--统计每天T+15天的交叉数据 日历数据取连续日期的,每个日期生成一个T+15的连续日期数组并explode,得到一个日期对应T+15行日期
select
  calendar_date,
  from_unixtime(unix_timestamp(calendar_date,'yyyy-MM-dd'),'yyyyMMdd') as calendar_dt,
  from_unixtime(unix_timestamp(date_1,'yyyy-MM-dd'),'yyyyMMdd') as dt_1
from dim_date_info
lateral view explode(qhstats_date_array(calendar_date,date_add(calendar_date,15),1)) a as date_1
where calendar_date between '${zdt.addDay(-20).format("yyyy-MM-dd")}' and '${zdt.addDay(-1).format("yyyy-MM-dd")}'

自然周累计

--STANDARD_MONDAY=2016-01-03 #计算day_of_week 任意周末对应的日期
    sum(quantity) over(partition by product_partner,is_mainland,is_ark,year,week_of_year order by day_of_week asc rows between unbounded preceding and current row) as quantity_week,
    sum(quantity) over(partition by product_partner,is_mainland,is_ark,year,month_of_year order by day_of_month asc rows between unbounded preceding and current row) as quantity_month,
    sum(profit) over(partition by product_partner,is_mainland,is_ark,year,week_of_year order by day_of_week asc rows between unbounded preceding and current row) as profit_week,
    sum(profit) over(partition by product_partner,is_mainland,is_ark,year,month_of_year order by day_of_month asc rows between unbounded preceding and current row) as profit_month,
    sum(gmv) over(partition by product_partner,is_mainland,is_ark,year,week_of_year order by day_of_week asc rows between unbounded preceding and current row) as gmv_week,
    sum(gmv) over(partition by product_partner,is_mainland,is_ark,year,month_of_year order by day_of_month asc rows between unbounded preceding and current row) as gmv_month
from (
    select
        all.report_date,
        all.product_partner,
        all.is_mainland,
        all.is_ark,
        coalesce(quantity,0) as quantity,
        coalesce(profit,0) as profit,
        coalesce(profit_rate,0) as profit_rate,
        coalesce(gmv,0) as gmv,
        coalesce(quantity_qc,0) as quantity_qc,
        coalesce(profit_qc,0) as profit_qc,
        coalesce(gmv_qc,0) as gmv_qc,
        datediff(all.report_date,trunc(all.report_date,'MM'))+1 as day_of_month,
        if(pmod(datediff(all.report_date,'$STANDARD_MONDAY'),7)=0,7,pmod(datediff(all.report_date,'$STANDARD_MONDAY'),7)) as day_of_week,
        weekofyear(all.report_date) as week_of_year,
        month(all.report_date) as month_of_year,
        year(all.report_date) as year

shell相关

shell方法并行执行(后台执行并查询任务执行状态)

function run_basic() {
    ( sleep 3; calculate_q_order ) &
    ( sleep 3; calculate_q_checkout ) &

    local ret_code=0
    for pid in $(jobs -p)
    do
        wait "${pid}"
        if (($?!=0)); then
            ((ret_code++))
        fi
    done
    return $ret_code
}
--1. command& 让进程在后台运行
--2. jobs 查看后台运行的进程
--3. fg %n 让后台运行的进程n到前台来
--4. bg %n 让进程n到后台去;   
--PS:"n"为jobs查看到的进程编号.

alias节省时间

alias del_davinci='curl -X POST http://davincidata.corp.qunar.com/data/kudu/drop_pg_table -d "business=hotel&tableName=$1&password=******"'
alias product_db='psql "host ={host} port =5432 user ={user} password ={password} dbname ={dbname} "'
alias ftp_enter='lftp {username}:{password}@{host}:{port}'

无脑循环刷数

连续日期

#!/bin/bash

START_DATE='20200813'
END_DATE='20201011'
EXEC_DATE=${START_DATE}

while [ ${EXEC_DATE} -le ${END_DATE} ]
do
echo ${EXEC_DATE}
job_test rpt_qc_order_price_comparison/new_druid ${EXEC_DATE}
EXEC_DATE=`date -d "1 day ${EXEC_DATE}" +"%Y%m%d"`
done

非连续日期

#!/bin/bash

DATE_ARRARY=(
'20180727'
'20181016'
'20181018'
)

for EXEC_DATE in ${DATE_ARRARY[@]}
do
echo ${EXEC_DATE}
job_test mdw_order_v3/all_extend ${EXEC_DATE}
done

小时执行

#!/bin/bash

for HOUR in {00..23} ; do
  echo 'run ...'
done

自动更新合并master分支并push

#!/bin/bash
branch=$1

if [ -z $branch ];then #$branch is empty
  cd /home/q/home/junjieiot.wang/hotel-analysis-jobs
  git branch -a | grep junjieiot.wang
else
  echo "git push $branch"
  cd /home/q/home/junjieiot.wang/hotel-analysis-jobs
  git checkout master
  git pull
  git checkout $branch
  git pull
  git merge master
  git push origin $branch
fi

screen后台执行

退出screen
Ctrl + a + d
查看screen
Screen –ls
重连screen
Screen –r 192218
Screen关闭
Exit

数据库/ftp连接

MySQL

mysql -u{username} -h{hostname} -P{port} -D{dbname} -p{password}

postgresql

psql "host ={host} port =5432 user ={user} password ={password} dbname ={dbname} "

ftp

lftp {username}:{password}@{host}:{port}

赞赏支持

取消

打赏,支持一下!

扫码支持
扫码支持

打开支付宝/微信扫一扫,即可扫码打赏哦!