问题导读
1.安装kafka是否需要安装zookeeper?
2.kafka安装需要哪些步骤?
3.如何验证kafka是否安装成功?
4.flume source目录是哪个?
5.flume在kafka中扮演什么角色?
6.如何测试
整合
配置是否成功?
kafka安装
flume与kafka整合很多人都用到,但是网上却没有一份详细可靠的教程。说的都是些只言片语。这里整理份flume与kafka整合的教程。
flume原先并不兼容kafka。后来兼容添加上去。对于flume及与kafka的相关知识,推荐参考
flume应该思考的问题
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102
上面只是加深对flume的认识。下面我们开始整合flume与kafka。
思路:
1.安装kafka
2.安装flume,在配置中添加kafka相关配置
这里使用的版本:
kafka:kafka_2.11-0.9.0.1.tgz
flume:apache-flume-1.6.0-bin.tar.gz
kafka的安装
一、安装zookeeper
在master机器进行以下操作。
1. 解压zookeeper
下载zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
[Bash shell]
纯文本查看
复制代码
?
1
|
tar
-zxvf ~
/jar/zookeeper-3
.4.6.
tar
.gz -C
/data
|
网盘下载
链接: https://pan.baidu.com/s/1i50vzb3 密码: p5s5
2. 配置zookeeper
涉及到的配置文件为
[Bash shell]
纯文本查看
复制代码
?
1
|
${ZOOKEEPER_HOME}
/conf/zoo
.cfg
|
zoo.cfg通过cp zoo_sample.cfg zoo.cfg得到。
[Bash shell]
纯文本查看
复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
tickTime=2000
initLimit=10
syncLimit=5
dataDir=
/data/zk_data
clientPort=2181
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
|
这儿解释下格式为server.X=host:port1:port2的意思,X表示当前host所运行的服务的zookeeper服务的id(在接下来填写myid时需要用到),port1表示zookeeper中的follower连接到leader的端口号,port2表示leadership时所用的端口号。注意:需要手动去创建dataDir所配置的/data/zk_data目录(mkdir -p /data/zk_data)更多配置可参考:
zookeeper配置文件详解
http://www.aboutyun.com/thread-13909-1-1.html
3. 填写myid
在zoo.cfg配置文件中的dataDir目录(在这儿是/data/data_zk)下创建myid文件,文件内容为zoo.cfg中master所对应的server.X。
[Bash shell]
纯文本查看
复制代码
?
1
|
echo
"1"
>
/data/zk_data/myid
|
4. 复制到其他节点
[Bash shell]
纯文本查看
复制代码
?
1
2
|
scp
-r
/data/zookeeper-3
.4.6/
/data/zk_data
aboutyun@slave1:
/data
scp
-r
/data/zookeeper-3
.4.6/
/data/zk_data
aboutyun@slave2:
/data
|
在slave1上,
[Bash shell]
纯文本查看
复制代码
?
1
|
echo
“2” >
/data/zk_data/myid
|
在slave2上,
[Bash shell]
纯文本查看
复制代码
?
1
|
echo
"3"
>
/data/zk_data/myid
|
5. 添加到环境变量
在master、slave1、slave2上,分别将以下内容添加到~/.bashrc文件中
[Bash shell]
纯文本查看
复制代码
?
1
2
|
export
ZOOKEEPER_HOME=
/data/zookeeper-3
.4.6
export
PATH=$ZOOKEEPER_HOME
/bin
:$PATH
|
然后执行以下命令:source ~/.bashrc
6. 启动验证
在master、slave、slave2上,分别执行zookeeper启动命令。
[Bash shell]
纯文本查看
复制代码
?
由于启动时,每个节点都会试图去连接其它节点,因此先启动的刚开始会连接不上其它的,导致日志中会包含错误信息,在未全启动之前,这个属正常现象。启动完成后,使用jps命令和zkServer.sh status命令经行验证
master机器情况:
slave1机器情况:
slave2机器情况:
说明每个节点都成功启动了QuorumPeerMain进程,并且slave1上的进程为leader,master和slave2上的进程为follower
很多人想知道zookeeper在kafka中的作用,可参考下面文章
kafka在zookeeper中存储结构
http://www.aboutyun.com/forum.php?mod=viewthread&tid=9941
二、安装kafka
在master上进行如下操作:
1. 解压kafka
下载kafka:http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
[Bash shell]
纯文本查看
复制代码
?
1
|
tar
-zxvf ~
/jar/kafka_2
.11-0.9.0.1.tgz -C
/data
|
链接: https://pan.baidu.com/s/1sk8CAeT 密码: 8h2r
2. 配置kakfa
涉及到的配置文件为${KAFA_HOME}/config/server.properties
必须要配置的是这三个参数:broker.id、log.dirs、zookeeper.connect
broker.id表示当前broker的id,要求是唯一的非负数。log.dirs表示kafka日志的存放目录。zookeeper.connect表示连接的zookeeper的地址。
[Bash shell]
纯文本查看
复制代码
?
1
2
3
|
broker.
id
=0
log.
dirs
=
/data/kafka-logs
zookeeper.connect=master:2181,slave1:2181,slave2:2181
|
注意
:需要手动在本地创建/data/kafka-logs目录
3. 复制到其他节点
[Bash shell]
纯文本查看
复制代码
?
1
2
|
scp
/data/kafka_2
.11-0.9.0.1/
/data/kafka-logs/
aboutyun@slave1:
/data
scp
/data/kafka_2
.11-0.9.0.1/
/data/kafka-logs/
aboutyun@slave2:
/data
|
在slave1机器上将server.properties配置文件的broker.id值改为1在slave2机器上将server.properties配置文件的broker.id值改为2
4. 添加环境变量
在master、slave1、slave2机器上,分别将以下内容添加到~/.bashrc文件中
[Bash shell]
纯文本查看
复制代码
?
1
2
|
export
KAFKA_HOME=
/data/kafka_2
.11-0.9.0.1
export
PATH=$KAFKA_HOME
/bin
:$PATH
|
然后执行以下命令:source ~/.bashrc
5. 启动验证
在master、slave1、slave2机器上,分别执行kafka启动命令
[Bash shell]
纯文本查看
复制代码
?
1
2
|
cd
$KAFKA_HOME
kafka-server-start.sh -daemon .
/config/server
.properties
|
之后在每台机器上执行jps命令:
如果每台机器上都成功启动了kafka这个进程,说明我们搭建成功。如果发现某台机器上没有kafka这个进程,可以将kafka的启动命令去掉参数-daemon(加上的话表示后台启动),这样可以直接在屏幕上看到错误信息。
三、kakfa使用示例
1. 创建topic
[Bash shell]
纯文本查看
复制代码
?
1
2
|
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic
test
--replication-factor 1 --partitions 3
|
2. 创建producer
[Bash shell]
纯文本查看
复制代码
?
1
|
kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9094 --topic
test
|
3. 创建consumer
重新打开一个窗口:
[Bash shell]
纯文本查看
复制代码
?
1
|
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic
test
--from-beginning
|
4. 产生消息并接受
在producer的窗口中输入几条测试信息
查看consumer窗口的情况
说明成功消费了的产生的3条信息
flume与kafka整合安装
flume安装,其实也并不复杂,可是整合的时候,很多人遇到这么个情况,消费者收不到信息。这个的原因很多。出现问题,无非两种。
1.对flume和kafka基本不理解,只是照抄。这里面就容易出现问题。比如配置错误agent名字错误,配置过期等
2.对整个过程不理解。比如有的不报错,只是看到几行信息。很可能是kafka还没有启动。而且很多人遇到过这种情况,网络都是通的,防火墙是关闭的,为何连接还是拒绝的,原因可能就是,服务根本没有启动。
上面两个方法,相信可以解决大部分问题,更多的其实还是需要自己去理解和查看出现错误的地方。
当然对于flume的配置的理解,还是推荐参考
flume应该思考的问题
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102
上面只是加深对flume的认识。
下面开始安装flume及整合kafka
一、Flume安装
1. 压缩安装包
[Bash shell]
纯文本查看
复制代码
?
1
2
|
tar
-zxvf ~
/jar/apache-flume-1
.6.0-bin.
tar
.gz -C
/data
mv
/data/apache-flume-1
.6.0-bin/
/data/flume-1
.6.0
|
网盘下载
链接:
http://pan.baidu.com/s/1bBnF5O
密码:xoll
2. 配置环境变量
编辑文件 ~/.bashrc
sudo vim ~/.bashrc
[Bash shell]
纯文本查看
复制代码
?
1
2
|
export
FLUME_HOME=
/data/flume-1
.6.0
export
PATH=$FLUME_HOME
/bin
:$PATH
|
[Bash shell]
纯文本查看
复制代码
?
3. 配置flume
[Bash shell]
纯文本查看
复制代码
?
1
2
|
cp
flume-
env
.sh.template flume-
env
.sh修改JAVA_HOME
export
JAVA_HOME=
/data/jdk1
.8.0_111
|
4. 验证安装
[Bash shell]
纯文本查看
复制代码
?
二、Flume使用
1. 单节点的agent
1) 增加配置文件
cd $FLUME_HOME/conf
vim single_agent.conf
将以下内容拷贝进去
[Bash shell]
纯文本查看
复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
a1.sources.source1.
type
= spooldir
a1.sources.source1.spoolDir=
/data/aboutyunlog
a1sources.source1.fileHeader = flase
|