Kafka消息队列

Kafka消息队列

一、概念

Kafka是一个基于发布订阅者模式的消息队列,实现数据缓存、流量削峰等等功能。

在大数据生态下,Kafka主要是用来和实时计算框架对接去处理海量的实时数据

1.1 消息队列一般有两种架构

1.1.1 点对点模式
1.1.2 发布/订阅者模式

二、架构

2.1 生产者producer

生产消息、数据

Kafka的生产者,生产者是为了给topic的partition生产数据的,生产者可以是Flume,也可以是我们自定义的操作,还可以是Kafka自带的控制台生产者。 生产者生产的数据放到Kafka主题的哪个分区? 生产者生产的数据都是key和value格式类型的数据,只不过key值可以不存在。

  1. 如果生产的消息只有value,没有key,那么消息会采用轮询机制选址一个主题分区放入数据\

  2. 如果生产的消息key和value都有,但是没有指定分区机制,会按照key的hashcode值和topic的分区数取一个余数,放到对应的分区

  3. 如果生产的消息key和value都有。那么为了避免数据倾斜,我们可以自定义分区机制

2.2 Kafka集群cluster

2.2.1 Broker

Kafka集群的一个节点,每一个broker节点都会有一个唯一的编号

2.2.2 Topic

主题,就是消息队列中的消息队,一个Broker中可以存在多个主题,一个主题也可以存在于多个Broker上

Kafka中消息主题,就是消息队列,是Kafka用来存储消息的组件,topic中存放的数据是有序的

2.2.3 Partition

分区,每一个Topic主题都可以指定存储的分区数,一般情况下,一个Broker会存储一个主题的一个分区数据,而且每一个分区还可以设置副本数保证存储数据的安全性,分区和分区副本之间有一个主从架构关系。分区副本数不能随便设置,必须小于等于broker的数量。

Topic存放的消息最底层是以分区的形式存在的,Topic所谓的数据有序,不是整体有序,而是每一个分区内部是有序的。分区设置副本机制的,副本数量必须小于等于broker的节点数量 Kafka主题分区的数据不是永久存在的,而是有一个数据清理机制(基于时间的清理机制、基于分区数据大小的清理机制)

2.2.4 zookeeper

Kafka中主题、分区、消费者等等元数据信息都是交给zookeeper统一管理的

2.3 消费者Consumer

订阅主题,消费数据

消费者:消费数据的最小单位,但是一个消费者可以订阅多个topic的数据

2.4 消费者组Consumer Group

将多个消费者组合起来,同时消费同一个主题的数据

消费者组:一个消费者组可以有多个消费者,其中topic一个分区的数据只能被消费者组的一个消费者消费,如果我们想要让一个消费者消费topic所有分区的数据,那么我们需要保证消费者组中只有一个消费者。

三、Kafka的安装

3.1 解压、重命名

3.2 修改Broker配置文件

server.properties

3.2.1 启用删除主题的功能
3.2.2、logs日志文件目录的配置
3.2.3、分区日志文件的滚动和删除规则
3.2.4、broker的编号
3.2.5、配置zookeeper的地址

四、Kafka的启动和关闭

4.1 启动

4.1.1 启动zookeeper
4.1.2 启动kafka

kafka-server-start.sh /opt/xxxx/server.properties &

4.2 关闭

kafka-server-stop.sh

五、kafka的基本使用

5.1 Kafka的命令行操作方式

5.1.1 主题的操作
5.1.1.1 创建主题

kafka-topic.sh --create --topic topicName --partitions num --replication-factor num<=borkerCount --zookeeper zkserverxxx

5.1.1.2删除主题

kafka-topic.sh --delete --topic topicName --zookeeper xxx 必须开启主题的删除功能

5.1.1.3修改主题

kafka-topic.sh --alter --topic topicName --partitions num --zookeeper xxx 主题分区数一般只能增加

5.1.1.4查询某个主题的详细信息

kafka-topic.sh --describe --topic topicName --zookeeper xxx

5.1.1.5查询所有的主题

kafka-topic.sh --list --zookeeper xxx

5.1.2 生产者的操作

kafka-console-producer.sh --bootstrap-server ip:9092,ip:9092 --topic topicName

5.1.3 消费者的操作

kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic topicName --group groupName

5.2 Kafa的Java API操作方法

5.2.1 主题的操作

主要通过AdminClient类来实现kafka的各种操作

创建AdminClient类需要写一个配置项 bootstrap.servers

5.2.2 生产者的操作

生产者:KafkaProducer ProducerRecord

生产者创建需要赋予一些参数:参数的key都是在ProducerConfig类中封装的

5.2.3 消费者的操作

消费者:KafkaConsumer ConsumerRecords ConsumerRecord

消费者创建需要赋予一些参数:参数的key都是ConsumerConfig类中封装的

5.2.4 earliest、latest区别

六、kafka的可视化监控工具

offset explorer

kafka eagle

七、kafka和Flume的整合

后期我们在做实时计算的时候,我们经常会做如下操作,会通过Flume采集相关数据到Kafka中缓存,然后再使用实时计算框架对接kafka进行计算。

Flume采集的数据给kafka,那么此时也就意味着Flume就相当于是Kafka的生产者,kafka相当于是Flume的sink下沉地

kafka除了当作flume的sink,也可以充当flume的source和channel

八、Kafka和Spark Streaming的整合

通过Spark Streaming消费Kafka中的数据,然后对数据进行实时计算处理

8.1 Spark Streaming整合Kafka有两种方式

8.1.1 Receiver模式

采用一个Reciver接受器去接受Kafka的数据,然后数据缓存到Spark的executor内存中,这种方式很容易出现数据丢失问题,如果想要实现数据的安全性,需要开启Spark的 WAL预写日志机制保证数据的安全性 receiver模式连接的zookooper实现

8.1.2 Direct模式

不需要接收器,直接连接Kafka节点获取数据,同时由Spark自动维护offset偏移量,此时我们不需要担心数据丢失。

8.2 整合步骤

8.2.1 引入一个编程依赖

spark-streaming-kafka-0.10/0.8 在Spark3版本之后,在KafkaUtils中把receiver模式移除了

九、整合案例的流程

想使用Flume采集端口的数据(以空格分割的单词)到kafka的某个主题中,然后借助Spark Streaming统计端口数据中每一个单词出现的总次数。

9.1 分析

9.1.1 编写Flume脚本

编写Flume脚本采集端口数据到Kafka中 sink指定到kafka即可,flume充当kafka的生产者

9.1.2 整合Spark Streaming

整合Spark Streaming代码读取kafka中的数据,此时Spark相当于kafka的消费者

十、相关代码

#1、起别名
demo.sources=s1
demo.channels=c1
demo.sinks=k1

#2、配置source数据源
demo.sources.s1.type=netcat
demo.sources.s1.bind=single
demo.sources.s1.port=44444

#3、配置channel管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=20000
demo.channels.c1.transactionCapacity=10000

#4、配置sink下沉地
demo.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
demo.sinks.k1.kafka.bootstrap.servers=single:9092
demo.sinks.k1.kafka.topic=flume-topic

# 5、关联
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
a.sources=s1
a.channels=c1
a.sinks=k1

a.sources.s1.type=org.apache.flume.source.kafka.KafkaSource
a.sources.s1.kafka.bootstrap.servers=single:9092
a.sources.s1.kafka.consumer.group.id=flume
a.sources.s1.kafka.topics=flume-topic

a.channels.c1.type=memory
a.channels.c1.capacity=20000
a.channels.c1.transactionCapacity=10000

a.sinks.k1.type=logger

a.sources.s1.channels=c1
a.sinks.k1.channel=c1
10-11 09:33