centos安装和supervisor管理kafka
导航
- Kafka简介
- java环境配置
- 安装jdk
- 配置环境变量
- 命令测试
- 特别说明
- 安装kafka
- 安装kafka服务端
- 测试
- 安装Kafka客户端工具
- 使用supervisor管理kafka服务
- Supervisor是什么
- centos 安装Supervisor
- supervisor管理kafka服务
- 命令行工具模拟生产者消费者
- 常见的kafka操作命令行
- 结语
- 参考
Kafka简介
Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Kafka是一款开源的、轻量级的、分布式的、可分区和具有复制备份的(Replicated)、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比,Kafka能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。
据Kafka官方网站介绍,Kafka定位就是一个分布式流处理平台。
Kafka能够很好地建立实时流式数据通道,由该通道可靠地获取系统或应用程序的数据,也可以通过Kafka方便地构建实时流数据应用来转换或对流式数据进行响应处理。
Kafka的应用已经非常广泛,常见的IM系统、电商网站、推荐系统都有它的身影...
特别是,近年来,在大数据处理中Flink + Kafka的组合逐渐流行起来,让kafka在大数据处理领域也有了更多一展身手的机会。
kafka涉及的知识点较多,其中最重要的是理解生产者-消费者模式,更加具体的概念,可以进一步查阅《官方文档》。
废话不多说,我们赶紧开始搭建kafka的搭建吧!
java环境配置
Kafka运行需要zookeeper配合,而zookeeper需要运行在JVM上,所以需要安装JDK。Kafka 从2.0.0版本开始就不再支持JDK7及以下版本,实际工作中我们一般安装JDK8+。
在Kafka官网下载页面的更新文档中明确指出:
We have dropped support for Java 7 and removed the previously deprecated Scala producer and consumer.--kakfa 2.0.0
操作系统:CentOS 8.2 64bit 云服务器
我们开始配置java环境。
安装jdk
(1) 去Java Downloads | Oracle中下载JDK的安装文件jdk-8u341-linux-x64.tar.gz。
到Oracle官网上Java Downloads中下载JDK的安装文件jdk-8u361-linux-x64.tar.gz,上传到centos上,接着执行本地安装jdk。

(2) 新建/home/java文件夹,将jdk-8u361-linux-x64.tar.gz放到该文件夹下,并切换到/home/java目录下。
(3) 执行命令tar -zxvf jdk-8u361-linux-x64.tar.gz
进行解压,解压后/home/java目录下多了jdk1.8.0_361文件夹。

通过以上步骤,JDK安装完毕。
Notes:
a.下载JDK,Oracle官方要求登录,没有账户的请按照说明注册即可。
b.安装jdk有两种方式,一种方式安装oracle jdk得下载安装包,第二种是安装openjdk。OpenJDK只包含最精简的JDK,建议使用第一种方式安装。
配置环境变量
(1) 在/etc/profile.d
目录下增加环境变量脚本文件配置JDK环境变量,/etc/profile
在每次启动时会执行/etc/profile.d下全部的脚本文件。
在/etc/profile.d
目录新建my_env.sh文件,并编辑如下内容:
#JAVA_HOME
export JAVA_HOME=/home/java/jdk1.8.0_361/
export PATH=$PATH:$JAVA_HOME/bin
export CALSSPATH=$CLASSPATH:$JAVA_HOME/lib
#JRE_HOME
export JRE_HOME=/home/java/jdk1.8.0_361/jre/
export PATH=$PATH:$JRE_HOME/bin
export CALSSPATH=$CLASSPATH:$JRE_HOME/lib
(2) 执行如下命令,重载配置文件,让profile文件立即生效
[root@hecs-275297 ~]# source /etc/profile
命令测试
(1) 使用javac命令,不会出现command not found
错误
(2) 使用java -version,出现版本为java version "1.8.0_361"
[root@hecs-275297 ~]# java -version
java version "1.8.0_361"
Java(TM) SE Runtime Environment (build 1.8.0_361-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.361-b09, mixed mode)
我们能看到jdk版本已经生效,如果jdk没有安装成功,将会提示-bash: java: command not found
(3) 查看配置是否都正确,echo $JAVA_HOME
,echo $CLASSPATH
,echo $PATH
[root@hecs-275297 ~]# echo $PATH
/home/java/jdk1.8.0_361//bin:/home/java/jdk1.8.0_361//bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
[root@hecs-275297 ~]# echo $CLASSPATH
.:/home/java/jdk1.8.0_361//lib:/home/java/jdk1.8.0_361//jre/lib
[root@hecs-275297 ~]# echo $PATH
/home/java/jdk1.8.0_361//bin:/home/java/jdk1.8.0_361//bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
[root@hecs-275297 ~]#
特别说明
Centos7系统中配置JDK环境变量的两种方式:
- 修改
/etc/profile
文件配置JDK环境变量 - 在
/etc/profile.d
目录下增加环境变量脚本文件,配置JDK环境变量
在/etc/profile
文件中,也给出了如下说明:
# It's NOT a good idea to change this file unless you know what you
# are doing. It's much better to create a custom.sh shell script in
# /etc/profile.d/ to make custom changes to your environment, as this
# will prevent the need for merging in future updates.
不建议在/etc/profile文件中设置系统环境变量,/etc/profile.d
比/etc/profile
更好管理。
安装kafka
安装指引
(1) 在home目录下新建kafka文件夹,使用wget
命令,远程下载kafka_2.13-2.8.2.tgz
安装文件
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz --no-check-certificate

此时,在/home/kafka
目录下就会出现名为kafka_2.13-2.8.2.tgz
的压缩文件。
Notes: 这里选择国内的清华源,下载速度更快。
(2) 解压文件
进入/home/kafka
,将压缩文件解压到当前目录
[root@hecs-275297 ~]# cd /home/kafka/
[root@hecs-275297 kafka]# tar -zxvf kafka_2.13-2.8.2.tgz
kafka_2.13-2.8.2/
kafka_2.13-2.8.2/LICENSE
kafka_2.13-2.8.2/NOTICE
kafka_2.13-2.8.2/bin/
kafka_2.13-2.8.2/bin/kafka-delete-records.sh
...
(3) 修改配置文件
在/home/kafka/kafka_2.13-2.8.2/config/
目录下有个server.properties文件。
修改其中的配置:
broker.id=0
log.dir=log.dirs=/tmp/kafka-logs
#配置zookeeper管理kafka的路径
zookeeper.connect=localhost:2181
#配置kafka的监听端口
listeners=PLAINTEXT://:9092
#把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP
advertised.listeners=PLAINTEXT://113.116.211.38:9092
Notes: advertised.listeners 以自己的服务器IP地址为准。
(4) 启动
这里采用kafka单机部署模式,修改完上述配置参数之后就可以启动服务。
先启动zookeeper,命令如下:
[root@hecs-275297 ~]# cd /home/kafka/kafka_2.13-2.8.2/
[root@hecs-275297 kafka_2.13-2.8.2]# bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/zookeeper-server-start.sh
再启动kafka,命令如下:
[root@hecs-275297 kafka_2.13-2.8.2]# bin/kafka-server-start.sh -daemon config/server.properties
测试
通过上面的步骤,我们已经启动了zookeeper和kafka,那么他可以访问了吗?
(1) 创建有测试一个kafka topic
进入到kafka目录下,接着通过下面命令创建一个kafka topic。
[root@hecs-275297 ~]# cd /home/kafka/kafka_2.13-2.8.2/
[root@hecs-275297 kafka_2.13-2.8.2]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
查看创建的topic,可以使用以下命令进行查看
[root@hecs-275297 kafka_2.13-2.8.2]# bin/kafka-topics.sh --list --zookeeper localhost:2181
testTopic
(2) 使用jps命令查看Kafka服务进程是否已经启动,如下:
[root@hecs-275297 kafka_2.13-2.8.2]# jps -l
3945850 kafka.Kafka
1225 org.tanukisoftware.wrapper.WrapperSimpleApp
3945361 org.apache.zookeeper.server.quorum.QuorumPeerMain
3966082 sun.tools.jps.Jps
[root@hecs-275297 kafka_2.13-2.8.2]#
安装Kafka客户端工具
Kafka的应用非常广泛,在实际工作中,往往会查看推送的消息内容,方便排查问题,安装一个kafka可视化工具显得尤为重要,而kafka Tool 就是这样的利器。
下载kafkatool可以前往官网:https://www.kafkatool.com/download.html

笔者的开发机是Windows 10/11,所以这里选择Windows 64-Bit下载。 选择对应版本,根据提示安装即可。
安装之后打开客户端,新建kafka连接
- 填写连接名称和服务器IP地址
- 填写kafka服务器IP+端口

填好之后,保存信息,并连接。连接成功效果如下:

此时,在客户端我们看到了之前在服kafka服务端新建的topic——testTopic
。
至此,kafka服务端和客户端的搭建就完成了。
Notes:
a. kafka的服务器IP地址,要以你自己的服务器地址为准
b. 远程连接kafka的服务器的端口,需要开通端口访问权限。
云服务器可以参考《阿里云添加安全组规则》
使用supervisor管理kafka服务
Supervisor是什么
Supervisor是一个Python写的进程管理工具,可以方便用于启动、重启、关闭进程。特别适合需要常驻内存的进程。
Supervisor相关命令:
# 启动supervisord
supervisord -c /etc/supervisord.conf
supervisorctl -c /etc/supervisord.conf
# 停止supervisord
supervisorctl shutdown
# 重新载入配置
supervisorctl reload
# 查看程序状态
supervisorctl status
# 查看服务器进程
ps -ef | grep supervisord
centos 安装Supervisor
(1) 使用yum安装Supervisor
yum install -y supervisor

安装好后在/etc/会生成一个supervisord.conf文件及一个supervisord.d文件目录
(2) 查看supervisor是否安装成功
[root@hecs-275297 /]# supervisord --version
4.2.2
(3) 启动
[root@hecs-275297 ~]# supervisord -c /etc/supervisord.conf
查看supervisor是否启动成功
[root@hecs-275297 ~]# ps -ef|grep supervisord
root 4031860 1 0 21:05 ? 00:00:00 /usr/bin/python3.6 /usr/bin/supervisord -c /etc/supervisord.conf
root 4033110 4030772 0 21:08 pts/0 00:00:00 grep --color=auto supervisord
[root@hecs-275297 ~]#
(3) 设置supervisor 开机启动
[root@hecs-275297 ~]# systemctl enable supervisord
Created symlink /etc/systemd/system/multi-user.target.wants/supervisord.service → /usr/lib/systemd/system/supervisord.service.
[root@hecs-275297 ~]#
检查是否是开机启动
[root@hecs-275297 ~]# systemctl is-enabled supervisord
enabled
(4) 查看服务状态
[root@hecs-275297 ~]# systemctl status supervisord.service
● supervisord.service - Process Monitoring and Control Daemon
Loaded: loaded (/usr/lib/systemd/system/supervisord.service; enabled; vendor preset: disabled)
Active: active (running) since Sat 2023-02-11 21:28:42 CST; 48s ago
Process: 4040629 ExecStart=/usr/bin/supervisord -c /etc/supervisord.conf (code=exited, status=0/SUCCESS)
Main PID: 4040632 (supervisord)
Tasks: 1 (limit: 23712)
Memory: 16.3M
CGroup: /system.slice/supervisord.service
└─4040632 /usr/bin/python3.6 /usr/bin/supervisord -c /etc/supervisord.conf
Feb 11 21:28:42 hecs-275297 systemd[1]: Starting Process Monitoring and Control Daemon...
Feb 11 21:28:42 hecs-275297 systemd[1]: Started Process Monitoring and Control Daemon.
[root@hecs-275297 ~]#
(5) 配置supervisor,web管理页面
修改配置信息,supervisor 默认配置文件,放在 /etc/supervisord.conf 路径中:
[inet_http_server] ; HTTP 服务器,提供 web 管理界面
port=*:9001 ; Web 管理后台运行的 IP 和端口
username=admin ; 登录管理后台的用户名
password=123 ; 登录管理后台的密码
[include]
files = supervisord.d/*.conf ;配置文件夹
修改完之后重启:
[root@hecs-275297 ~]# supervisorctl reload
Restarted supervisord
Notes: 为了演示,这的账户和密码设置很简单,实际在配置的时候,建议设置复杂密码。
在浏览器访问:http://ip:9001

输入刚才在supervisord.conf文件中设置的账户和密码。
这个时候我们可以通过9001端口访问下这个页面,就能看到一个没有任务列表的的页面

至此,supervisor安装完毕!
supervisor管理kafka服务
万事具备,我们终于可以将zookeeper和kafka服务进程加入到supervisor中管理起来了。
我们先在/etc/
目录下新建一个supervisor文件夹,用来存放我们自定义conf文件。
并修改/etc/supervisord.conf
文件
[include]
files = supervisor/*.conf
(1) 新建一个zookeeper.conf文件
进入/etc/supervisor/
目录,新建一个zookeeper.conf文件,添加内容如下:
[program:zookeeper]
command=/home/kafka/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh //home/kafka/kafka_2.13-2.8.2/config/zookeeper.properties
user=root
autostart=true
autorestart=true
startsecs=10
# 日志信息
log_stdout=true
log_stderr=true
stderr_logfile=/var/log/data/supervisor_zookeeper.err.log
stdout_logfile=/var/log/data/supervisor_zookeeper.out.log
redirect_stderr=true
environment=JAVA_HOME=/home/java/jdk1.8.0_361/
(2) 新建一个kafka.conf文件
进入/etc/supervisor/
目录,新建一个kafka.conf文件,添加内容如下:
[program:kafka]
command=/home/kafka/kafka_2.13-2.8.2/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.2/config/server.properties
user=root
autostart=true
autorestart=true
startsecs=10
# 日志信息
log_stdout=true
log_stderr=true
stderr_logfile=/var/log/data/supervisor_kafka.err.log
stdout_logfile=/var/log/data/supervisor_kafka.out.log
redirect_stderr=true
environment=JAVA_HOME=/home/java/jdk1.8.0_361/
修改完之后重启:
[root@hecs-275297 ~]# supervisorctl reload
Restarted supervisord
此时,刷新浏览器http://ip:9001

同样,我们supervisor命令也可以检查管理的进程
[root@hecs-275297 ~]# supervisorctl status
kafka RUNNING pid 4127959, uptime 0:34:11
zookeeper RUNNING pid 4125713, uptime 0:35:21
[root@hecs-275297 ~]#
在使用kafka客户端工具连接一下是否正常

一切都和我们预想的一样,有了supervisor来管理,zookeeper和kafka的启停将变得更加方便。
注意点:
如果在上述步骤中遇到
/home/kafka_2.11-2.0.0/bin/kafka-run-class.sh: line 306: /home/java/jdk1.8.0_131//bin/java: Permission denied
类似的提示。
这是linux安装jdk环境变量权限不够的问题。
执行chmod +x /home/java/jdk1.8.0_131/bin/java
提升权限。(jdk路径以实际为准)
命令行工具模拟生产者消费者
模拟生产者
./kafka-console-producer.sh --broker-list <Kafka服务器地址>:<端口号> --topic <主题名>
执行该命令后,将会进入生产者终端窗口,你可以在该窗口中输入消息并按回车键发送。发送的消息将被追加到指定的主题中。
我们在生产者终端窗口输入:"hello,world!","这里是成都"
模拟消费者
bin/kafka-console-consumer.sh --bootstrap-server <Kafka服务器地址>:<端口号> --topic <主题名> --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:19099 --topic testTopic --from-beginning
执行该命令后,将会进入消费者终端窗口,你可以在该窗口中读取并显示指定主题中的消息。该命令将从主题的起始处开始读取消息。

常见的kafka操作命令行
##启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
## 停服
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
## 查看topics
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
##查看某个topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic <主题名>
##创建某个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <主题名>
kafka删除topic命令
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic <主题名>
kafka-topics.sh --zookeeper <zookeeper-quorum> --delete --topic <topic-name>
kafka-topics.sh --zookeeper localhost:2181 --delete --topic <topic-name>
#查看消费者Consumer的Group列表
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka服务器地址>:<端口号> --list
## 模拟生产者操作
bin/kafka-console-producer.sh --broker-list <Kafka服务器地址>:<端口号> --topic <主题名>
## 模拟消费者操作
bin/kafka-console-consumer.sh --bootstrap-server <Kafka服务器地址>:<端口号> --topic <主题名> --from-beginning
结语
本文主要是对Centos环境下kafka的安装和管理进行了详细介绍,相信通过这篇文章能够给新手们一个好的指引,帮助大家少走弯路。
kafka在实际工作中的应用非常广泛,比如IM聊天和大数据方面都有应用。
感谢您的阅读。
选择并聚焦到一点,把事情做到极致。