CentOS7安装Kafka和简单测试

初识Kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

在Kafka有几个比较重要的概念:

  • broker

    用于标识每一个Kafka服务,当然同一台服务器上可以开多个broker,只要他们的broker id不相同即可

  • Topic

    消息主题,从逻辑上区分不同的消息类型

  • Partition

    用于存放消息的队列,存放的消息都是有序的,同一主题可以分多个partition,如分多个partiton时,同样会以如partition1存放1,3,5消息,partition2存放2,4,6消息。

  • Produce

    消息生产者,生产消息,可指定向哪个topic,topic哪个分区中生成消息。

  • Consumer

    消息消费者,消费消息,同一消息只能被同一个consumer group中的consumer所消费。consumer是通过offset进行标识消息被消费的位置。当然consumer的个数取决于此topic所划分的partition,如同一group中的consumer个数大于partition的个数,多出的consumer将不会处理消息。

分布式搭建Kafka

序号 服务器名称 IP地址 用途
1 node01 192.168.136.139
2 node02 192.168.136.154
3 node03 192.168.136.155
  1. 安装JDK8以上版本(oracle jdk)

  2. 安装zookeeper

    • 官网下载zookeeper

    • 解压zookeeper

      1
      tar xf zookeeper-3.4.12.tar.gz -C /opt/
    • 编辑zookeeper配置文件(集群配置文件相同)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      cd /opt/zookeeper-3.4.12/conf
      cp zoo_sample.cfg zoo.cfg
      vim zoo.cfg

      [root@localhost conf]# cat zoo.cfg |grep -v "^#"
      tickTime=2000
      initLimit=10
      syncLimit=5
      dataDir=/var/lib/zookeeper
      dataLogDir=/var/log/zookeeper
      clientPort=2181
      server.1=node01:2888:3888
      server.2=node02:2888:3888
      server.3=node03:2888:3888

      参数说明

      tickTime这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是说每个tickTime时间就会发送一个心跳。

      initLimit这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。

      当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20秒。

      syncLimit这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。

      dataDir顾名思义就是zookeeper保存数据的目录,默认情况下zookeeper将写数据的日志文件也保存在这个目录里;

      clientPort这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求;

      server.A=B:C:D中的A是一个数字,表示这个是第几号服务器,B是这个服务器的IP地址,C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用的端口。

    • 创建serverID标识

      除了修改zoo.cfg配置文件外,zookeeper集群模式下还要配置一个myid文件,这个文件需要放在dataDir目录下。

      这个文件里面有一个数据就是A的值(该A就是zoo.cfg文件中server.A=B:C:D中的A),在zoo.cfg文件中配置的dataDir路径中创建myid文件。

      1
      2
      3
      node01: echo "1" > /var/lib/zookeeper/myid
      node02: echo "2" > /var/lib/zookeeper/myid
      node03: echo "3" > /var/lib/zookeeper/myid
    • scp到其它机器

      1
      2
      scp -r /opt/zookeeper-3.4.12 root@node02:/opt/
      scp -r /opt/zookeeper-3.4.12 root@node03:/opt/
    • 创建文件夹

      1
      2
      mkdir /var/lib/zookeeper
      mkdir /var/log/zookeeper
    • 启动

      1
      [root@localhost conf]# /opt/zookeeper-3.4.12/bin/zkServer.sh start
    • 查看状态

      1
      [root@localhost conf]# /opt/zookeeper-3.4.12/bin/zkServer.sh status
  3. 安装kafka

    • 官网下载kafka

    • 解压kafka

      1
      tar xf kafka_2.11-1.1.0.tgz -C /opt/
    • 修改配置文件(集群配置文件稍有不同)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      [root@localhost soft]# cd /opt/kafka_2.11-1.1.0/config/
      [root@localhost config]# cat server.properties |grep -v "^#"|grep -v "^$"
      broker.id=1 #node02:broker.id=2 #node03:broker.id=3
      listeners=PLAINTEXT://node01:9092
      #node02:listeners=PLAINTEXT://node02:9092
      #node03:listeners=PLAINTEXT://node03:9092
      num.network.threads=3
      num.io.threads=8
      socket.send.buffer.bytes=102400
      socket.receive.buffer.bytes=102400
      socket.request.max.bytes=104857600
      log.dirs=/var/lib/kafka
      num.partitions=2
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      log.retention.hours=168
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      zookeeper.connect=node01:2181,node02:2181,node03:2181
      zookeeper.connection.timeout.ms=6000
      group.initial.rebalance.delay.ms=0

      参数说明:

      broker.id broker唯一标识
      listeners kafka监听IP及安全方式
      log.dirs 日志存储
      num.partitions 创建topic时默认partition数量
      zookeeper.connect zookeeper服务器地址

    • 启动kafka(所有集群启动)

      1
      2
      [root@node01 ~]# cd /opt/kafka_2.11-1.1.0/bin
      ./kafka-server-start.sh -daemon ../config/server.properties
    • 查看

      1
      jps

使用kafka

  1. 创建topic

    1
    ./bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --topic my-test-topic --partitions 5 --replication-factor 1
  2. 创建生产者

    1
    ./bin/kafka-console-producer.sh --topic my-test-topic --broker-list node01:9092,node02:9092,node03:9092
  3. 创建消费者

    1
    ./bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic my-test-topic --from-beginning

--------------------本文结束,感谢您的阅读--------------------

本文标题:CentOS7安装Kafka和简单测试

文章作者:弓昭

发布时间:2018年12月21日 - 20:40

最后更新:2020年04月08日 - 22:20

原始链接:https://gongzhao1.gitee.io/CentOS7安装Kafka和简单测试/

联系邮箱:gongzhao1@foxmail.com