最近有一个需求,统计每天的新老用户,日活,周活,月活。
我们每天的增量数据会加入到hive历史数据表中,包含用户访问网站的一些信息,字段有很多,包括用户唯一标识guid。
当然了日活,周活,月活就是一个count(distinct(guid))语句,非常常用的sql。

但是这里的问题是:

是的,历史数据里面是用户网站访问行为,同一个用户在同一天,不同的天都有可能出现,guid在历史表中会有多次。如果直接join,性能很差,实际上是做了很多不必要的工作。

解决方案:

维护了这么一张用户表后,接下来就可以写hql统计业务了,计算当天新老用户时,只需要与这个历史库进行join就行了(目前为止4千万),当日guid去重后是1千多万,这样就是4千万~1千万的join了,与开始4千万~100亿的join,性能会有巨大提升。

hive历史表的设计与hive相关配置
可以看到这里hive历史表history_helper需要频繁修改,hive表支持数据修改需要在${HIVE_HOME}/conf/hive-site.xml中添加事务支持:

<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>


为了提高查询速度,hive历史表与增量表这里都分桶,hive-xite.xml配置:

<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>
  • 1
  • 2
  • 3
  • 4

为了提高reduce并行度,也设置一下:

set mapred.reduce.tasks = 50;
  • 1

这个最好在hive命令行配置,表明只在当前程序使用该配置,就不要配置配置文件了。
历史库建表语句:

create external table if not exists hm2.history_helper
(
  guid string,
  starttime string,
  endtime string,
  num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");

    当天增量表,保存去重后的guid,建表语句:

    create table if not exists hm2.daily_helper
    (
      guid string,
      dt string
    )
    clustered by(guid) into 50 buckets
    stored as orc TBLPROPERTIES ("transactional"="true");

      思路

      由于这种需要写成定时模式,所以这里用python脚本来实现,将hive查询结果保存到本地文件result.txt,然后python读取result.txt,连接数据库,保存当天的查询结果。

      代码

      helper.py

      #!/usr/bin/python
      # -*- coding:utf-8 -*-
      
      # hive更新历史用户表,日常查询,保存到MySQL
      
      import sys
      import datetime
      import commands
      import MySQLdb
      
      # 获取起始中间所有日期
      def getDays(starttime,endtime,regx):
          datestart=datetime.datetime.strptime(starttime,regx)
          dateend=datetime.datetime.strptime(endtime,regx)
          days = []
          while datestart<=dateend:
              days.append(datestart.strftime(regx))
              datestart+=datetime.timedelta(days=1)
          return days
      
      # 获得指定时间的前 n 天的年、月、日,n取负数往前,否则往后
      def getExacYes(day, regx, n):
          return (datetime.datetime.strptime(day,regx) + datetime.timedelta(days=n)).strftime(regx)
      
      # 获得距离现在天数的年、月、日,n 取值正负含义同上,昨天就是getYes(regx,-1)
      def getYes(regx, n):
          now_time = datetime.datetime.now()
          yes_time = now_time + datetime.timedelta(days=n)
          yes_time_nyr = yes_time.strftime(regx)
          return yes_time_nyr
      
      # 执行hive命令
      def execHive(cmd):
          print cmd
          res = commands.getstatusoutput(cmd)
          return res
      
      # 获得当前是星期几
      def getWeek(regx):
          now_time = datetime.datetime.now()
          week = now_time.strftime(regx)
          return week
      
      # 格式化日期,加上双引号
      def formatDate(day):
          return "\"" + day + "\""
      
      # 数据保存到mysql
      def insertMysql(dt, path, tbName, regx):
          # new, dayAll, stay
          values = []
          with open(path) as file:
              line = file.readline()
              while line:
                  values.append(line.strip())
                  line = file.readline()
          dayAll = int(values[1])
          new = float(values[0])/dayAll
          old = 1 - new
      
          # 获取数据库连接
          conn = MySQLdb.connect("0.0.0.0", "statistic", "123456", "statistic")
          # 获取游标
          cursor = conn.cursor()
      
          # 查询昨天的用户人数
          yesDay = getExacYes(dt, regx, -1)
          sql = 'select dayAll from %s where dt = %s'%(tbName, formatDate(yesDay))
          try:
              cursor.execute(sql)
          except Exception as e:
              print e
      
          yesAll = int(cursor.fetchall()[0][0])
          stay = float(values[2]) / yesAll
          print stay
          # 获取游标
          cursor2 = conn.cursor()
          sql = 'insert into  %s\
          values("%s",%f,%f,%f,%d)'%(tbName, dt, new, old, stay, dayAll)
          print sql
          try:
              cursor2.execute(sql)
              conn.commit()
          except:
              conn.rollback()
          finally:
              conn.close()
      
      # 初始化,删除临时表,并且创建
      def init():
          # 设置分桶环境
          cmd = 'source /etc/profile;hive -e \'set hive.enforce.bucketing = true;set mapred.reduce.tasks = 50;\''
          (status,result) = execHive(cmd)
          # 清除当天的临时表,结果保存
          cmd = 'source /etc/profile;hive -e \'drop table hm2.daily_helper;\''
          (status,result) = execHive(cmd)
          if status == 0:
              print '%s昨天临时表删除完毕...'%(day)
          else:
              print result
              sys.exit(1)
          cmd = 'source /etc/profile;hive -e \'create table if not exists hm2.daily_helper\
          (\
          guid string,\
          dt string\
          )\
          clustered by(guid) into 50 buckets \
          stored as orc TBLPROPERTIES ("transactional"="true");\''
          (status,result) = execHive(cmd)
          if status == 0:
              print '%s临时表创建完毕...'%(day)
          else:
              print result
              sys.exit(1)
      
      # 主函数入口
      if __name__ == '__main__':
          regx = '%Y-%m-%d'
          resultPath = '/home/hadoop/statistic/flash/helper/result.txt'
          days = getDays('2018-07-01','2018-07-20',regx)
          tbName = 'statistic_flash_dailyActive_helper'
          for day in days:
              init()
              # 当天数据去重后保存到临时表daily_helper
              cmd = 'source /etc/profile;hive -e \'insert into hm2.daily_helper select distinct(guid),dt from hm2.helper \
              where dt = "%s" and guid is not null;\''%(day)
              print '%s数据正在导入临时表...'%(day)
              (status,result) = execHive(cmd)
              if status == 0:
                  print '%s数据导入临时表完毕...'%(day)
              else:
                  print result
                  sys.exit(1)
              # guid存在则更新 endtime 与 num
              cmd = 'source /etc/profile;hive -e \'update hm2.history_helper set endtime = "%s",num = num + 1 \
              where guid in (select guid from hm2.daily_helper);\''%(day)
              print '正在更新endtime 与 num...'
              (status,result) = execHive(cmd)
              if status == 0:
                  print '%s history_helper数据更新完毕'%(day)
              else :
                  print result
                  sys.exit(1)
              # 当天新用户
              cmd = 'source /etc/profile;hive -e \'select count(1) from hm2.daily_helper \
              where guid not in (select guid from hm2.history_helper);\' > %s'%(resultPath)
              (status,result) = execHive(cmd)
              if status != 0:
                  print result
                  sys.exit(1)
              # 不存在插入
              cmd = 'source /etc/profile;hive -e \'insert into hm2.history_helper\
              select daily.guid,dt,dt,1 from hm2.daily_helper daily\
              where daily.guid not in (select guid from hm2.history_helper where guid is not null);\''
              print '正在插入数据到history_helper表...'
              (status,result) = execHive(cmd)
              if status == 0:
                  print '%s数据插入hm2.history_helper表完成'%(day)
              else:
                  print result
                  sys.exit(1)
              # 当天总人数
              cmd = 'source /etc/profile;hive -e \'select count(1) from hm2.daily_helper;\' >> %s'%(resultPath)
              (status,result) = execHive(cmd)
              if status != 0:
                  print result
                  sys.exit(1)
              # 次日活跃留存
              cmd = 'source /etc/profile;hive -e \'select count(1) from\
              (select guid from hm2.helper where dt = "%s" group by guid) yes\
              inner join\
              (select guid from hm2.helper where dt = "%s" group by guid) today\
              where yes.guid = today.guid;\' >> %s'%(getExacYes(day, regx, -1), day, resultPath)
              (status,result) = execHive(cmd)
              if status != 0:
                  print result
                  sys.exit(1)
              # 结果保存到mysql
              insertMysql(day, resultPath, tbName, regx)
              print '=========================%s hive 查询完毕,结果保存数据到mysql完成=============================='%(day)
      
      

      这是在处理历史数据,然后就是每天定时处理了,在linux crontab里加个定时器任务就好了。

10-03 21:32