实时流处理框架

同类型:

Apache Storm    实时流处理框架
Apache Spark Stream  批处理框架,时间间隔可以设置表较小
IBM Stream
Yahoo S4
LinkedIn Kafka

实时流处理框架与技术选型
实时流处理框架flume+kafka搭建-LMLPHP

Flume =>Kafka(为了防止数据量过大,使用消息队列)===》spark/strom ==》HBASE

分布式日志收集框架 Flume

Source

  • Avro,Exec,Kafka,NetCat Tcp

Channel

  • Memory 和File 和Kafka用的比较多

Sink

  • AVRO,Kafka,Hdfs,Hive,Hbase(同步异步),Logger(控制台)

常用架构

  • 多个source 收集数据通过avro sink==> avro source 将数据汇总到一个地方
    实时流处理框架flume+kafka搭建-LMLPHP
  • 当然也可以将一个数据源分别发送到不同的地方

实战

  1. 从指定端口采集数据到控制台
# example.conf: A single-node Flume configuration

a1:agent名称
r1:source名称
k1:sink名称
c1:channel名称

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# 串起来 Bind the source and sink to the channel
a1.sources.r1.channels = c1 (一个source可以到多个channel channels)
a1.sinks.k1.channel = c1  (一个sink 只能有一个channel channel)
  • 启动flume
首先设置好flume的JAVA_HOME
vim $FLUME_HOME/conf/flume-env.sh
添加修改配置信息导入java_home
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144

启动:
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console

这里可能会报错:
log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

问题原因:
上面--conf 中配置信息有问题,或者底下-Dflume.root.logger=INFO,console 这个写错了、
解决办法:配置好对应的信息,应该就可以了

  • 启动界面
    实时流处理框架flume+kafka搭建-LMLPHP
  • 启动另一个terminal 窗口
telnet localhost 44444
输入 随便一个信息,可以在flume agent 窗口看到对应的信息

从一个文件(/home/hadoop/data/data.log)采集新增数据到控制台

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

下图:
实时流处理框架flume+kafka搭建-LMLPHP

从A服务器上采集日志实时采集到B服务器(通过Avro sink)
exec source + memory channel + avro sink
avro source + memory channel + logger sink


exec-memory-avro 配置文件

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel


avro-memory-logger配置文件

avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel



先启动avro-memory-logger 再启动 exec-memory-avro

Kafka

介绍
  1. kafka使用scala来开发~
  2. 消息中间件:生产者—消费者------topic
    3.使用zookeeper来做集群管理
安装步骤
  1. 安装JDK环境(配置JAVA_HOME, 1.7.0_79)
  2. 安装Scala环境(配置SCALA_HOME, 2.10.4)
  3. 安装zookeeper环境(3.4.5)
  4. 安装kafka环境
安装kafka
启动zookeeper http://coolxing.iteye.com/blog/1871009 (zookeeper 集群搭建)
zkServer.sh start
配置文件修改(单节点,单borker配置)
vim server.properties
broker.id=0 #broker id的值
advertised.host.name=hadoop000 log.dirs=/home/hadoop/tmp/kafkaLogs #修改kafka地址 #修改kafka的地址
zookeeper.commect=hadoop000:2181,hadoop001:2181,hadoop002:2181
delete.topic.enable=true #启用删除topic配置
auto.create.topics.enable=false
# 关闭自动创建topic
启动kafka
kafka-server-start.sh [-daemon] $KAFKA_HOME/config/server.properties

创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
删除topic
kafka-topics.sh --delete --zookeeper hadoop000:2181  --topic my-replicated-topic



查看topic信息
kafka-topics.sh --list --zookeeper hadoop000:2181
kafka-topics.sh --describe --zookeeper hadoop000:2181 [--topic test] 查看所有topic和指定topic 信息

发送消息(配置broker)
kafka-console-producer.sh --broker-list hadoop000:9092 --topic test

消费消息(配置zookeeper)
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning

结果

实时流处理框架flume+kafka搭建-LMLPHP

单节点多broker部署 http://kafka.apache.org/quickstart#quickstart_multibroker
只需要修改配置文件中的broker.id分别为[1,2,3]
listenr 端口修改为[9091,9092,9093]
log.dir分别修改为不同目录

然后启动

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties

创建topic
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看创建好的topic
kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic

实时流处理框架flume+kafka搭建-LMLPHP

发布消息
kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic

消费消息
kafka-console-consumer.sh --zookeeper hadoop000:2181 --from-beginning --topic my-replicated-topic

实时流处理框架flume+kafka搭建-LMLPHP

停掉一个节点

ps aux |grep kafka |grep server-1
kill -9 2317

依然保持可用状态,查看副本可用状态见下图

再关掉另一个节点,依然可用!

实时流处理框架flume+kafka搭建-LMLPHP

重启之前停掉的节点,依然可用
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
发现状态又恢复从原来的状态
kafka PHP扩展

http://www.cnblogs.com/imarno/p/5198940.html

停止kafka
/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-stop.sh

Hadoop 安装

配置文件修改

  1. 将ssh id_rsa.pub 添加到authorized_keys
  2. hadoop.env 设置JAVA_HOME地址
  3. core-sit.xml 设置fs.defaultFs 和 hadoop.tmp.dir
  4. hdfs-site.xml 设置节点数量
  5. slaves 设置为 hadoop000
  6. mapred-sit.xml 设置mapreduce.framework.name
  7. yarn-sit.xml 设置 yarn.nodemanager.aux-services

初始化namenode

bin/hdfs namenode -format

启动hdfs
sbin/start-dfs.sh

启动yarn
sbin/start-yarn.sh

http://hadoop000:50070 查看hadoop信息
http://hadoop000:8088 查看yarn信息

Hbase 安装

配置文件

  1. JAVA_HOME
  2. HBASE_MANAGES_ZK=false (不使用hbase的zk来管理)
  3. hbase.env
    hbase.rootdir 和hadoop中core-sit配置一样 eg: hdfs://hadoop000:8082/hbase
    hbase.cluster.distributed true
    hbase.zookeeper.quorum hadoop000:2181
  4. regionservers 添加 hadoop000

启动hbase

start-hbase.sh

命令行测试
hbase.sh shell


http://hadoop000:60010

10-04 10:02