本文介绍了druid的基础架构以及工作过程,通过一个应用案例加深了解。

durid简介

druid是一种高性能、列式存储、分布式数据存储的时序数据分析引擎。能支持“PB”级数据的秒级查询。类似的产品有kylin/clickhouse。druid典型的应用就是OLAP场景下的cube组合查询分析。如数据钻取(Drill-down)、上卷(Roll-up)、切片(Slice)、切块(Dice)以及旋转(Pivot)。后面的应用示例章节再详细阐述。

durid基础架构

先来了解一下durid主要节点:

1、broker node(代理节点)

Broker节点扮演着历史节点和实时节点的查询路由的角色。主要负责接收外部查询,转发查询至各个segment数据所在的节点,并聚合结果返回。

2、historical node(历史节点)

historical主要负责历史数据存储和查询,接收协调节点数据加载与删除指令,从deepstoage中下载segment,完成数据加载或者删除后在zk中进行通告。历史节点遵循shared-nothing的架构,因此节点间没有单点问题。节点间是相互独立的并且提供的服务也是简单的,它们只需要知道如何加载、删除和处理不可变的segment。historical节点也可以进行分组,组合成不同的historical tier。这会在集群规模较大的时候体现出优势。如做数据的冷热分离,按不同业务的数据分离(一定程度的资源隔离)。当然,historical 节点是整个集群查询性能的核心所在,因为historical会承担绝大部分的segment查询。

3、coordinator node(协调节点) 

主要负责数据的管理和在历史节点上的分布。协调节点告诉历史节点加载新数据、卸载过期数据、复制数据、和为了负载均衡移动数据。可以配置load数据及drop数据规则。 

4、overlord node(index service 可以理解为任务管理节点) 

功能描述:负责接收任务,管理任务。接收外部http请求(新建任务、查询任务状态、kill任务等),分配管理任务(当有新的任务请求,overload node会将任务分配给middleManager node去执行)。

5、middleManager node(可以理解为overlord节点的工作节点)  

功能描述:可以启动n(可配置)个peon,接收overlord分配的task,再交给自己peon去执行。

 

druid 的基础架构与应用-LMLPHP

 

 

 

 

查询过程

见上图蓝色箭头,Broker节点接收到查询(Q1),再将查询发送给历史节点与实时节点(Q2,Q3),在上图的模式中,实时节点是MM节点上启动的task。该task会负责数据的摄入以及提供实时数据的查询。

数据摄入过程

见上图红色箭头,D1是client生产数据最终写入kafka(这个过程可能在client与kafka的中间,还包含了多个环节,如数据传输与数据清洗),D2和D3过程是部署tranquility-kafka服务,消费kafka数据写入对应的task,tranquility-kakfa启动的时候会跟overlord节点通信,由overlord节点分配任务给middleManager执行。D4是task 负责的segment段正常结束,然后将segment数据写入deepstorage过程。(实时task运行时间是segmentGranularity+windowPeriod+intermediatePersistPeriod)。D5则是historical节点从deepstorage下载segment并在zk中声明负责该segment段查询的过程。 

目前druid数据摄入过程还有一种更推荐的方式就是kafka index service(简称kis),有兴趣的同学可以参考官方文档,kis对kafka的版本有强要求。

druid整体架构虽然略为复杂,但是整体稳定性非常不错,几乎很少出现集群故障。抛开集群硬件故障和数据本身问题,SLA基本能到4个9。coordinator,overlord两个节点是主从模式,保证每个角色起两个实例即可。broker节点无状态,可以起多个实例,前面挂个域名即可(为了保证缓存命中,最好配置ip hash)。historical节点无状态,有一定冗余即可。middleManager用作数据摄入节点,若task没有配置副本,则节点宕机会引发丢数据的风险。当然,kis可以避免该问题。

durid数据聚合、存储核心思想

druid 数据存储分为三部分timestamp、dimensions、metrics。其中,timestamp、metrics部分是采用lz4直接压缩。

但是dimensions部分需要支持过滤查询以及分组查询。所以dimensions部分的每个维度都采用了以下三种数据结构做转码、存储:

  1. A dictionary that maps values (which are always treated as strings) to integer IDs,
  2. For each distinct value in the column,a bitmap that indicates which rows contain that value,and
  3. A list of the column’s values,encoded using the dictionary in 1

举个例子,源数据如下:

 

druid 的基础架构与应用-LMLPHP

 

 

 

 

name列来说 

1. Dictionary that encodes column values 

字典表的key都是唯一的,所以Map的key是unique的column value,Map的value从0开始不断增加。 示例数据的name列只有两个不同的值。所以张三编号为0,李四编号为1:

{
    "张三": 0
    "李四": 1
  }

2. Column data 

要保存的是每一行中这一列的值,值是ID而不是原始的值。因为有了上面的Map字典,所以有下面的对应关系:  

[0, 

1, 

1, 

0]  

3. Bitmaps - one for each unique value of the column 

BitMap的key是第一步Map的key(原始值), value则是真假的一个标识(是|否?等于|不等于?),取值只有0、1,如下:

value="张三": [1,0,0,1]  

value=“李四": [0,1,1,0]

所以由上可知最坏的情况可能是随着数据量的增加,bitmap的个数也成线性增长,为数据量大小*列的个数。那么在什么情况下会导致这种线性增长?这里我们引入了一个基数(cardinality)的概念。基数=unique(dim1,dim2.....),如若dim取值均为各种爆炸性id或者随机数,则druid的预聚合将完全失去意义。所以在druid的应用场景中,基数约小,聚合效率越高。

讲了dimensions怎么存储,那么metrics又是怎么聚合(roll-up)呢?这就要引入druid数据schema定义了。下一章结合应用一块看一个示例。

应用示例与实践经验

假设有这样一份数据,典型的商品销售数据。

 

druid 的基础架构与应用-LMLPHP

 

 

 

 

我们构造成druid中的数据schema如下:

{
  "dataSources" : [ {
    "spec" : {
      "dataSchema" : {
        "dataSource" : "test_datasource",
        "granularitySpec" : {
          "segmentGranularity" : "hour",
          "queryGranularity" : "minute",
          "type" : "uniform"
        },
        "parser" : {
          "type" : "string",
          "parseSpec" : {
            "format" : "json",
            "timestampSpec" : {
              "column" : "time",
              "format" : "auto"
            },
            "dimensionsSpec" : {
              "dimensions" : [ "productName", "city", "channel", “action"]
            }
          }
        },
        "metricsSpec" : [ {
          "name" : "count",
          "type" : "count"
        },  {
          "type" : "doubleSum",
          "fieldName" : "price",
          "name" : “sale"
        } ]
      },
      "tuningConfig" : {
        "type" : "realtime",
        "windowPeriod" : "PT10M",
        "intermediatePersistPeriod" : "PT10M",
        "maxRowsInMemory" : "100000"
      }
    },
    "properties" : {
      "topicPattern" : "test_datasource",
      "task.partitions" : "2",
      "task.replicants" : "1"
    }
  } ],
  "properties" : {
    ...
  }
}

前面重点说了dimensions,我们再来看下metrics。在上面的例子中我们只定义count和针对price的doubleSum,那么这些指标就已经固定了后期的分析需求。我们看到上面table中的一二行标红部分,所有dim取值完全相同,queryGranularity为一分钟。那么在这2018-06-11 12:23:00这个点,这两行数据就被聚合成一行,count=2,sale=0。以此类推。 

然后我们再来看看具体的分析需求,一个钻取的例子。我们首先查看商品A昨天的点击量,select sum(count) from table where productName=‘A’ and action=‘click',再想看看地区=北京,渠道=web呢?是不是再加几个where就搞定了?select sum(count) from table where productName=‘A’ and city=‘北京’ and channel=‘web' and action=‘click’; 然后就是切片和切块,也很简单,就是几个group by。这些在druid中都能非常轻松的支持。

具体使用上的经验总结:

1. reindex思想。一般我们实时数据查询粒度配置的会比较小,秒级或者分钟级。那么对于一天前,三天前,一个月前的数据呢?这时候一般关注的粒度将不再那么细,所以我们一般会采取redinx的策略进行再聚合 

2. 针对历史数据,可能对于某些维度将不在关心,这时候我们也可以在reindex时,将无用的维度剔除掉,可能大大减少整体数据的基数。 

3. 一般数据压缩比例。这里提供一个大概的参考值。数据总基数在10W以下,每天数据量约百亿左右,druid中聚合后的索引数据与原始数据大小之比可以到1:100,甚至1:1000。 

4. druid适用于常规的olap场景,能非常轻松的支撑每天百亿甚至千亿级别的数据写入。 

5. 爆炸性维度数据,以及频繁update数据的需求,不适用于druid的场景。

总结

本文主要对druid做了入门级的基础介绍,可以给大家做olap引擎技术选型时做一个参考。以及对druid的初学者做一个大致介绍。druid是一款非常优秀的olap引擎,从性能、稳定性上来说,都是非常不错的。

10-06 15:03