初识Kafka
kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
- 支持通过kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
在Kafka有几个比较重要的概念:
broker
用于标识每一个Kafka服务,当然同一台服务器上可以开多个broker,只要他们的broker id不相同即可
Topic
消息主题,从逻辑上区分不同的消息类型
Partition
用于存放消息的队列,存放的消息都是有序的,同一主题可以分多个partition,如分多个partiton时,同样会以如partition1存放1,3,5消息,partition2存放2,4,6消息。
Produce
消息生产者,生产消息,可指定向哪个topic,topic哪个分区中生成消息。
Consumer
消息消费者,消费消息,同一消息只能被同一个consumer group中的consumer所消费。consumer是通过offset进行标识消息被消费的位置。当然consumer的个数取决于此topic所划分的partition,如同一group中的consumer个数大于partition的个数,多出的consumer将不会处理消息。
分布式搭建Kafka
序号 | 服务器名称 | IP地址 | 用途 |
---|---|---|---|
1 | node01 | 192.168.136.139 | |
2 | node02 | 192.168.136.154 | |
3 | node03 | 192.168.136.155 |
安装JDK8以上版本(oracle jdk)
安装zookeeper
官网下载zookeeper
解压zookeeper
1
tar xf zookeeper-3.4.12.tar.gz -C /opt/
编辑zookeeper配置文件(集群配置文件相同)
1
2
3
4
5
6
7
8
9
10
11
12
13
14cd /opt/zookeeper-3.4.12/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
[root@localhost conf]# cat zoo.cfg |grep -v "^#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888参数说明
tickTime这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是说每个tickTime时间就会发送一个心跳。
initLimit这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20秒。
syncLimit这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。
dataDir顾名思义就是zookeeper保存数据的目录,默认情况下zookeeper将写数据的日志文件也保存在这个目录里;
clientPort这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求;
server.A=B:C:D中的A是一个数字,表示这个是第几号服务器,B是这个服务器的IP地址,C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用的端口。
创建serverID标识
除了修改zoo.cfg配置文件外,zookeeper集群模式下还要配置一个myid文件,这个文件需要放在dataDir目录下。
这个文件里面有一个数据就是A的值(该A就是zoo.cfg文件中server.A=B:C:D中的A),在zoo.cfg文件中配置的dataDir路径中创建myid文件。
1
2
3node01: echo "1" > /var/lib/zookeeper/myid
node02: echo "2" > /var/lib/zookeeper/myid
node03: echo "3" > /var/lib/zookeeper/myidscp到其它机器
1
2scp -r /opt/zookeeper-3.4.12 root@node02:/opt/
scp -r /opt/zookeeper-3.4.12 root@node03:/opt/创建文件夹
1
2mkdir /var/lib/zookeeper
mkdir /var/log/zookeeper启动
1
[root@localhost conf]# /opt/zookeeper-3.4.12/bin/zkServer.sh start
查看状态
1
[root@localhost conf]# /opt/zookeeper-3.4.12/bin/zkServer.sh status
安装kafka
官网下载kafka
解压kafka
1
tar xf kafka_2.11-1.1.0.tgz -C /opt/
修改配置文件(集群配置文件稍有不同)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23[root@localhost soft]# cd /opt/kafka_2.11-1.1.0/config/
[root@localhost config]# cat server.properties |grep -v "^#"|grep -v "^$"
broker.id=1 #node02:broker.id=2 #node03:broker.id=3
listeners=PLAINTEXT://node01:9092
#node02:listeners=PLAINTEXT://node02:9092
#node03:listeners=PLAINTEXT://node03:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0参数说明:
broker.id broker唯一标识
listeners kafka监听IP及安全方式
log.dirs 日志存储
num.partitions 创建topic时默认partition数量
zookeeper.connect zookeeper服务器地址启动kafka(所有集群启动)
1
2[root@node01 ~]# cd /opt/kafka_2.11-1.1.0/bin
./kafka-server-start.sh -daemon ../config/server.properties查看
1
jps
使用kafka
创建topic
1
./bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --topic my-test-topic --partitions 5 --replication-factor 1
创建生产者
1
./bin/kafka-console-producer.sh --topic my-test-topic --broker-list node01:9092,node02:9092,node03:9092
创建消费者
1
./bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic my-test-topic --from-beginning