MaxCompute表设计最佳实践

产生大量小文件的操作

MaxCompute表的小文件会影响存储和计算性能,因此我们先介绍下什么样的操作会产生大量小文件,从 而在做表设计的时候考虑避开此类操作。

  • 使用MaxCompute Tunnel SDK上传数据,上传过程中,每commit一次就会产生一个文件。这时每 个文件过小(比如几K),并且频繁上传(比如5秒上传)一次,则一小时就会产生720个小文件,一 天就会产生17280个小文件。
  • 使用MaxCompute Tunnel SDK上传数据,create了session但是没有upload数据直接做了 commit,产生大量空目录(服务侧等同于小文件)。
  • 使用MaxCompute Console命令行工具Tunnel命令上传时,将本地大文件切分过小,导致上传后产 生文件数过多,文件过小。
  • 通过DataHub做数据归档,Datahub 的每个shard写 MaxCompute 有两个条件:数据总量达到 64MB,commit 一次到 MaxCompute,形成一个文件。或者每隔 5 分钟一次 commit,形成一个 文件。那么:开的shard数多(比如20个shard),每个shard数据在5分钟内都远远达不到64M,比 如就是几百K,就会产生大量小文件。那么一天就会产生241220=5760个小文件。
  • 通过Dataworks等数据开发工具进行数据增量插入(insert into)到MaxCompute的表(或者表分 区)里时,每个insert into都会产生一个文件,若每次insert into 10条,每天累计insert insert 10000条记录,则会产生1000个小文件。
  • 通过阿里云DTS将数据从RDS等数据库同步到MaxCompute,DTS进行数据同步时,会创建全量表和 增量表,增量表进程数据插入过程中会因为每次数据插入条数较少而commit比较完整一次数据同步, 从而在增量表中造成小文件问题,比如每隔5分支执行一次同步,每次同步的数据量为10条,一天内的 增量为10000条,则会产生1000个小文件。此种场景,需要在数据同步完成后进行全量极限表和增量 数据表的merge。
  • 源数据采集客户端太多,源数据通过T unnel直接进入到一个分区,每个源数据采集客户端提交一次数 据,都会在同一分区下产生一个独立的文件导致产生大量小文件。
  • SLS 触发 FunctionCompute持续高频往MaxCompute中心接入文件,小文件流式数据进入 MaxCompute。

根据数据划分项目空间

项目空间(Project)是MaxCompute最高层的对象,按项目空间进行资源的分配、隔离和管理,实现了 多租户的管理能力。

  • 如果多个应用需要共享“数据”,则推荐使用同一个项目空间。
  • 如果多个应用所需“数据”是无关的,则推荐使用不同的项目空间。 项目空间间的表和分区可以通过Package授权的方式进行交换。

“维度表”设计的最佳实践:

一般情况下描述属性的表设计为维度表。维度表可以和任意表组的任意表进行关联,并且创建时不需要配 置分区信息,但是对单表数据量大小有所限制。维度表的设计和使用注意以下几点:

  • 一般要求维度表单表不超过1000万。
  • 维度表的数据不应该被大量更新。
  • 维度表和其他表进行Join操作时可以使用mapjoin。

拉链表设计 – 极限存储的应用

极限存储功能待发布,在此介绍主要提供设计思想。 基于MaxCompute的拉链表设计背景 在数据仓库的数据模型设计过程中,经常会遇到这样的需求:

  • 数据量比较大。 表中的部分字段会被update,如用户的地址,产品的描述信息,订单的状态、手机号码等等。
  • 需要查看某一个时间点或者时间段的历史快照信息。(比如,查看某一个订单在历史某一个时间点的 状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等)
  • 变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右,如果表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费。 
    考虑极限存储的使用: MaxCompute提供了将不同表转化为极限存储表的能力。极限存储操作示例如下:
  1. 创建源表。
create table src_tbl (key0 STRING, key1 STRING, col0 STRING, col1 STRING, col2 STRING) PARTITIO N (datestam p_x STRING, pt0 STRING);
  1. 导入数据。
  2. 将src_tbl转变为极限存储的表。
set odps.exstore.primarykey=key0,key1;
[set odps.exstore.ignorekey=col0;]
EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140801'); EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140802');

拉链表设计更详细介绍可以参考云栖文章:https://yq.aliyun.com/articles/542146#? spm=a2c41.11181499.0.0

采集源表的设计

数据采集方式:流式数据写入, 批量数据写入,周期调度条式数据插入。 
大数据量情况下,确保同一个业务单元的数据使用分区和表进行分;在数据量较小情况下,优化采集频率。

  • 流式数据写入。

    • 对于流式写入的数据,一般采集的通道较多,相关采集通道应做有效区分,在单个数据通道写入 量较大的情况下应该进行按照时间进行分区设计。
    • 在采集通道数据量较小的情况下可以采取非分区表设计,对终端类型和采集时间设计成标准列字 段。
    • 采用Datahub进行数据写入时应该合理规划shard数量,放置由于shard过多而造成采集通道流量 较小且通道较多的问题。
  • 批量数据写入。批量数据写入重点关注写入周期 周期调度条式数据插入。
  • 避免周期数据插入,此种情况下需要建立分区表,在新分区进行插入操作, 减少对于原来分区影响。

日志表的设计

日志其实是个流水表,不涉及记录的更新,来一条采集一条,多条一起存放,日志表设计的主要注意几 点:

create table src_tbl (key0 STRING, key1 STRING, col0 STRING, col1 STRING, col2 STRING) PARTITIO N (datestam p_x STRING, pt0 STRING);
  set odps.exstore.primarykey=key0,key1;
[set odps.exstore.ignorekey=col0;]
EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140801'); EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140802');
  • 考虑是否需要对日志进行去重处理。
  • 考虑是否需要扩展维度属性。

    • 是否需要关联维表扩展维度属性字段考虑两点:业务使用的频次,关联是否会造成的产出的延 迟。
    • 需要仔细选择是否对于维度表进行扩展
  • 考虑区分终端类型。

    • 日志表由于量大,考虑在业务分析使用时通常会按PC端,APP端来统计分析,同时PC端、APP端 的采集是两套体系,因此通常的做法会按终端设计多个明细DWD表。
    • 如果终端较多,但数据量不大的情况下,如一个终端的数据小于1T 但是采集次数较多,可以考虑 不对终端进行分区,而设置终端信息为普通列。

注意:

  • 对于日志表进行分区设计,可以按照日志采集的时间按照天进行分区,在入数据前进行数据采集整 合,一批数据写入提交一次(通常是64M)。
  • 日志数据很少有对原来分区的更新操作,可以采用insert 进行少量数据的插入,但一般需要限制插入 次数。
  • 如果有大量的更新的操作,需要采用insert overwrite操作避免小文件问题。
  • 对日志表设置合理的分区和对已经⻓久不访问的冷热数据配置归档操作。

互动明细表的设计

周期快照表,每天对收藏的所有记录进行快照存放。 
问题:历史累计的记录非常多,每天生成快照要拿当天增量表与前一天的全量表merge,非常耗资源。统 计最近1天的新增收藏数,需要扫描全量表,如何降低资源? 
建议的方案:建立一个事务性事实表,在建立一个存放当前有效收藏的周期快照表,以满足各种不同业务 的统计分析需要。
注意:

  • 设计互动明细表最重要的是要区分存量数据和增量数据之间的关系。 - 对于新分区的数据可以写入,作为增量数据。
  • 应尽量减少对于老的分区数据的修改和插入。
  • 在数据插入和全表覆盖写种选择时应尽量选用insert overwrite而并选择insert into。

MaxCompute表数据更新与删除操作

关系型数据库支持的 delete/update/merge SQL ,在MaxCompute上的实现方式示例如下: 
表准备

-- 上日全量表
table1(key1 string,key2 string,col1 string,col2 string);
-- 今日增量表
table2(key1 string,key2 string,col1 string,col2 string);
​-- 今日增量表(删除)
table3(key1 string,key2 string,col1 string,col2 string);

update(table2 表中的记录的值,更新到table1表中)

insert overwrite table table1 select t1.key1
,t1.key2
,case when t2.key1 is not null then t2.col1 else t1.col1 end as col1 ,case when t2.key1 is not null then t2.col2 else t1.col2 end as col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 ;

delete(table2 表中的记录,从table1表中删除)

insert overwrite table table1 select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 where t2.key1 is null
;

merge(没有del)

insert overwrite table table1 select
from (
-- 先把上日存在,今日也存在的记录从上日表中排除。剩下的就是今日没有更新的记录 select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 where t2.key1 is null
union all
-- 再合并上今日增量,就是今天的全量 select t2.key1
select t2.key1
  ,t2.key2
  ,t2.col1
  ,t2.col2
from table2 t2)tt
;

merge(有del)
insert overwrite table table1 select
from (
-- 先把上日存在,今日也存在的记录从上日表中排除,再把今日删除的记录排除。剩下的就是今日没有更 新的记录

insert overwrite table table1 select
from (
-- 先把上日存在,今日也存在的记录从上日表中排除,再把今日删除的记录排除。剩下的就是今日没有更 新的记录
select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 left outer join table3 t3 on t1.key1=t3.key1 and t1.key2 = t3.key2
where t2.key1 is null or t2.key1 is null
union all
-- 再合并上今日增量,就是今天的全量 select t2.key1
,t2.key2 ,t2.col1 ,t2.col2
from table2 t2)tt ;

表创建设计示例

场景:天气情况信息采集。

  • 基本信息: 数据信息包括地名,关于此地的属性数如面积,基本人口数量等信息,天气信息。
  • 属性数据变化较小,天气信息数采用多个终端采集,且数据量较大
  • 天气信息变化较大,终端数量稳定的情况下流量基本稳定。
    表设计指南:
  • 建议对数据信息划分为基本属性表,和天气日志表,区分变化小和变化大的数据。
  • 因为数据量巨大,对天气日志表按照地域进行分区,也可以按照时间如天进行二级分区,此种分区方 式避免因某一地或某一个时间的天气变化而造成其他无关数据变化。
  • 采集终端上使用datahub进行数据汇聚,依据稳定的流量值选择合适的shard通道数量,批量数据方式 写入到天气日志表中,不使用Insert into。

MaxCompute表的特色功能

生命周期

MaxCompute表/分区提供数据生命周期管理。表(分区)数据从最后一次更新时间算起,在经过指定的 时间后没有变动,则此表(分区)将被MaxCompute自动回收。这个指定的时间就是生命周期,生命周期 设置为表级别。

create table test_lifecycle(key string) lifecycle 100;/alter table test_l ifecycle set lifecycle 50;

MaxCompute会根据每张非分区表或者分区的的LastDataModifiedTime以及lifecycle的设置来判断是 否要回收此非分区表或者分区表中的分区。 MaxCompute SQL提供touch操作用来修改分区的 LastDataModifiedTime。会将分区的LastDataModifiedTime修改为当前时间。修改 LastDataModifiedTime的值,MaxCompute会认为表或分区的数据有变动,生命周期的计算会重新开始。

ALTER TABLE table_nam e TO UCH PARTITIO N(partition_col='partition_col_valu e', ...);

注意:

  • 合理规划表的生命周期,在创建表时即设置生命周期,可有效减少存储压力。
  • 对表数据的任何变动都会影响生命周期回收数据的判断时间,包括小文件合并。

避免全表扫描

表设计:

  • 建立分区表或者对扫描条件进行列设计。
  • 对数据表进行合理分区。 对常用查询条件设置成列名。
  • 读常用查询条件进行hash clustering
    数据计算:
  • 加分区过滤条件,或者减少扫描分区数,或者拆出中间小表然后再扫描小表的历史分区以减少数据扫描 量。
  • 把全局扫描表中间结果进行存储形成中间表。
  • 如果每天都去扫一年的分区,计算消耗是非常大的,建议拆出一张中间表,每天做一次汇总,然后再 去扫描这张中间表的一年分区,扫描数据量会减少很多。

避免小文件

  • Reduce计算过程产生的小文件:只需要insert overwrite源表(或分区)即可,或者写入到新表删除 源表。
  • Tunnel数据采集过程中产生的小文件建议:

    • 调用tunnelsdk时当buffer达到64M时提交一次;
    • 使用console时避免频繁上传小文件,建议积累较大时一次性上传; 如果导入的是分区表,建议给分区设置生命周期,过期不用的数据自动清理;
    • 同第一种方案,insertoverwrite源表(或分区);
    • ALTER合并模式,通过console命令进行合并。
  • 使用临时表建议创建时都加上生命周期,到期后垃圾回收自动回收。 - 申请过多的datahub shard将会产生小文件问题,申请datahub shard数目的策略 :

    • 默认吞吐量单个shard是1MB/s,可以按照这个分配实际的shard数目(可以在此基础上多加几 个);
    • 同步odps的逻辑是每个shard有一个单独的task(满足5分钟或者64MB会commit一次),默认设置5分钟是为了尽快能在odps查到数据。如果是按照小时建partition,那个一个shard每个小 时有12个文件。
    • 如果这个时候数据量很少,但是shard很多,在odps里面就会很多小文件(shard*12/hour)。
    • 不要过多的分配shard,按需分配。

转化Hash Clustering表

Hash Clustering表的优势:优化Bucket Pruning/优化Aggregation/优化存储。 在创建表时使用CLUSTERED BY指定Hash Key,MaxCompute将对指定列进行Hash运算,按照Hash 值分散到各个Bucket里面。
Hash Key指选择原则:

  • 选择重复键值少的列
  • SORTED BY用于指定在Bucket内字段的排序方式。
    如何转化为HashClustering表:
ALTER TABLE table_nam e [CLUSTERED BY (col_nam e [, col_nam e, ...]) [SO RTED B Y (col_nam e [ASC | DESC] [, col_nam e [ASC | DESC] ...])] INTO num ber_of_buck ets BUCKETS]

ALTER TABLE语句适用于存量表,在增加了新的聚集属性之后,新的分区将做hash cluster存储。 创建 完HashClustering的表之后使用insert overwrite从另外一个源表进行转化。
注意,Hash Clustering表有以下限制:

  • 不支持insert into,只能通过insert overwrite来添加数据。
  • 不支持tunnel直接upload到range cluster表,因为tunnel上传数据是无序的。

原文链接 

12-19 20:27