https://www.cnblogs.com/warehouse/p/9521382.html   参考

        Kafka是0.10新引入的一个Feature。它是提供了对存储于Kakfa内的数据进行流式处理和分析的功能。使得Kafka不再仅是一个消息引擎,而是往一个分布式流处理平台方向发展。

如下特点

简单轻量级的Library,非常方便的嵌入到Java应用中,可以任意方式打包和部署。

除了Kafka外,无任何依赖。

充分利用Kafka的分区机制实现水平扩展和顺序性保证。

通过可容错的stage strore实现高效的状态操作(比如windowed join 和agreegation)

支持正好一次处理语义

提供记录级的处理能力,从而实现毫秒级的低延时。

支持基于事件事件的时间窗口,并且可以处理晚到的数据。

同时提供底层的处理原语Processror(类似于Strom的spout和bolt),以及高层抽象的DSL(类似Spark的map/group/reduce)

流式计算:

       在流式计算模型中,输入是持续的,可以认为时间是无界的,也就意味着永远拿不到最全的数据进行计算.同时,计算结果也是持续输出的,也即计算结果上是无界的。流式计算一般对实时结果要求较高,一般也是先定义目标计算,然后数据到来之后将计算逻辑用于数据。

为什么要使有Kakfa

当前已经有非常多的流式处理系统。最知名且应用最多的开源流式处理系统有Spark Stream和Apache Strom。Apache Strom应用广泛,提供记录级别的处理能力。当前也支持SQL  on Stream. 而Spark Stream基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于其他熟悉Spark应用开发的人员而言使用门槛低。

   既然Apache Spark与Apache Strom拥有如此多的优势,那为何还需要Kafka Strom呢,主要原因如下:

1.Spark 和 Strom是流式处理框架,而Kafak Stream是一个基于Kafka的流式处理类库。 框架要求开发者按照特定的方式去开发逻辑,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高。Kafka Stream作为一个流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者调用。整个应用的运行方式有开发者控制

2.Kafka部署简单

3.就流式处理而言,基本都支持Kafka作为数据源。例如Strom有专门的kafka-spout,而Spark也提供专门的spark-kafka-stream模块。事实上,kafka基本上是主流的流式处理标准数据源。大部分流失系统中都已经部署了Kafka,此时使用Kafka的成本非常低。

4.由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。

5.由于Kafka的Consumer的Rebanace机制,Kafka可以在线动态调整并行度。

Processor Topology

        基于kafka stream的流式应用逻辑全部通过一个被称为Processor Topology的地方执行。它与strom的Topology和Spark的DAG类似。都定义了数据在各个处理单元间的流动方式,或者说定义了数据的处理逻辑。

public class WordCountProcessor implements Processor<String, String> {
  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;
  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }
  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }
  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }
  @Override
  public void close() {
    this.kvStore.close();
  }
}

从上述代码中可见:

1.processor定义了每条记录的处理逻辑,也验证了Kafka具有记录级的数据处理能力。

2.context.scheduler定义了processor被执行的周期,从而提供了实现操作窗口的操作。

3.context.getStateStore提供的状态存储为有状态的计算(如窗口、聚合)提供了可能。

Kafka Strom的并行模型:

        并行模型中,最小粒度为Task,而每个Task包含一个特定子Topology的所有Processor。因此每个Task所执行的代码完全一样,唯一的不同在于所处理的数据集互补。Kafka Stream的Task包含了完整的子Topology,所以Task之间不需要传递数据,也就不需要网络通信。这一点降低了系统复杂度,也提供了系统处理效率。

KTable vs KStream 

KStream<String, String> stream = builder.stream("words-stream");
KTable<String, String> table = builder.table("words-table", "words-store");

KTable和KStream 是非常重要的两个概念。他们是Kafka实现各种语义的基础。因此这里有必要分析一下二者的区别。

KStream是一个数据流,可以认为所有记录都通过insert only 的方式插入到进入这个数据流里面。KTable代表一个完整的数据集,可以理解为数据库中的表。由于每条记录都有Key-Value对,这里可以将key理解为数据库中的Primary Key,而value可以理解为一行记录。可以认为KTable中的数据都是通过update only方式进入的。也就意味着,如果KTable对应的Topic中新进入的数据Key已经存在。那么KTable自会取出同一个Key对应的最后一条记录,相当于新的数据更新了旧的数据。

State Store

        流式处理中,部分操作是无状态的。例如操作过滤,而部分操作是有状态的,需要记录中间状态,如Windows操作和聚合操作。State Store被用来存储中间状态。他可以是一个持久化的key-value存储,也可以是内存中的HashMap或者数据库。Kafka提供了基于Topic的状态存储。

       Topic中存储的数据本身是key-value形式的,同时Kafka的log commpacting机制对历史数据做compact,保留每个key的最后一个value,从而在key不丢失的情况下,减少数据总量。提升查询效率。

       构造KTable时候,需要制定其state store name.默认情况下,该名字也即用于存储该KTable的状态的Topic的名字。遍历KTable的过程,实际就是遍历它对应的state store,或者遍历Topic的所有key。

03-28 12:03