它是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

Kafka与其他主流分布式消息系统对比

对比

基本概念

  • 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
  • 生产者:(Producer) :向broker发布消息的应用程序
  • AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript

Kafka架构

架构

kafka集群中的消息,是通过Topic(主题)来进行组织的。

Topic

topic

一些基本的概念:

  • 1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
  • 2、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。

kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

集群工作图

工作图

副本(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。

无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

Kafka安装配置

主机 IP 角色 服务 端口
centos-vm2 192.168.134.112 slave kafaka,zookeeper 9092,8080,2181,2888,3888
centos-vm3 192.168.134.113 master kafaka,zookeeper 9092,8080,2181,2888,3888
centos-vm4 192.168.134.114 slave kafaka,zookeeper 9092,8080,2181,2888,3888

端口解析:

  • 9092: kafka服务端口
  • 8080: kafka管理端口
  • 2181: zookeeper服务端口
  • 2888: zookeeper集群通讯端口
  • 3888:zookeeper集群选举端口

软件版本:

  • kafka: 2.11.2-2.1.0
  • zookeeper: 3.4.12

1. Zookeeper安装

见zookeeper安装文档

2. kafka安装

2.1 下载安装kafka

下载地址

解压安装

1
2
tar -zxvf kafka_2.11-2.1.0.tgz -C /opt
ln -s /opt/kafka_2.11-2.1.0 /opt/kafka

2.2 配置kafka

配置文件/opt/kafka/config/server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/logs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.134.112:2181,192.168.134.113:2181,192.168.134.114:2181 #设置zookeeper的连接端口
zookeeper.connection.timeout.ms=6000 #zookeeper连接超时时间

主要修改以下几个参数

1
2
3
4
broker.id=0
port=9092
log.dirs=/opt/kafka/logs/
zookeeper.connect=centos-vm2:2181,centos-vm3:2181,centos-vm4:2181

2.3 启动kafka

1
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

2.4 systemd支持

/usr/lib/systemd/system/kafka.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target
After=network.target zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/java/jdk1.8.0_102
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

启动kafka

1
2
3
systemctl daemon-reload
ststemctl enable kafaka
ststemctl start kafaka

2.5 kafka集群验证

创建Topic

1
./kafka-topics.sh --create --zookeeper centos-vm2:2181 --replication-factor 2 --partitions 1 --topic test

解释:

  • –replication-factor 2 #复制两份
  • –partitions 1 #创建1个分区
  • –topic #主题为test

在一台服务器上创建一个发布者

1
./kafka-console-producer.sh --broker-list centos-vm4:9092 --topic test  #控制台等待输入信息

在另一台服务器上创建一个订阅者

1
./kafka-console-consumer.sh --bootstrap-server centos-vm2:9092 --topic test --from-beginning  #控制台等待接收发布者的信息

若成功接受到消息,说明集群搭建成功

Kafka-manager安装配置

kafka manager是管理kafka集群的web界面工具

下载地址

1. 解压安装

1
2
3
unzip kafka-manager-master.zip
cd kafka-manager-master
./sbt clean dist # 需要配置sbt国内源,且整个执行过程比较耗费内存资源

生成的zip包位置在kafka-manager-master/target/universal目录下,此处假设生成的包名为kafka-manager-1.3.2.1.zip

解压安装

1
unzip kafka-manager-1.3.2.1.zip

2. 配置

修改kafka-manager-1.3.2.1/conf/application.conf

1
2
3
4
kafka-manager.zkhosts="centos-vm2:2181,centos-vm3:2181,centos-vm4:2181"
basicAuthentication.enabled=true
basicAuthentication.username="admin"
basicAuthentication.password="admin"

添加systemd支持文件:/usr/lib/systemd/system/kafka-manager.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Unit]
Description=Kafka manager server
Documentation=https://github.com/yahoo/kafka-manager
Requires=network.target
After=network.target kafka.service

[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/java/jdk1.8.0_102
ExecStart=/opt/kafka-manager-1.3.2.1/bin/kafka-manager -Dconfig.file=/opt/kafka-manager-1.3.2.1/conf/application.conf -Dhttp.port=8090
ExecStop=/opt/kafka-manager-1.3.2.1/bin/kafka-manager stop

[Install]
WantedBy=multi-user.target

3. 启动

1
2
3
systemctl daemon-reload
systemctl enable kafka-manager
systemctl start kafka-manager

访问: http://hostname:8090