centos环境中kafka安装、使用和supervisor管理
  楠木大叔   6/1/24 8:38:09 PM
kafka在实际工作中的应用非常广泛,比如IM聊天和大数据方面都有应用。

centos安装和supervisor管理kafka

导航

  • Kafka简介
  • java环境配置
    • 安装jdk
    • 配置环境变量
    • 命令测试
    • 特别说明
  • 安装kafka
    • 安装kafka服务端
    • 测试
    • 安装Kafka客户端工具
  • 使用supervisor管理kafka服务
    • Supervisor是什么
    • centos 安装Supervisor
    • supervisor管理kafka服务
  • 命令行工具模拟生产者消费者
  • 常见的kafka操作命令行
  • 结语
  • 参考

Kafka简介

Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

Kafka是一款开源的、轻量级的、分布式的、可分区和具有复制备份的(Replicated)、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比,Kafka能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。

据Kafka官方网站介绍,Kafka定位就是一个分布式流处理平台。

Kafka能够很好地建立实时流式数据通道,由该通道可靠地获取系统或应用程序的数据,也可以通过Kafka方便地构建实时流数据应用来转换或对流式数据进行响应处理。

Kafka的应用已经非常广泛,常见的IM系统、电商网站、推荐系统都有它的身影...

特别是,近年来,在大数据处理中Flink + Kafka的组合逐渐流行起来,让kafka在大数据处理领域也有了更多一展身手的机会。

kafka涉及的知识点较多,其中最重要的是理解生产者-消费者模式,更加具体的概念,可以进一步查阅《官方文档》

废话不多说,我们赶紧开始搭建kafka的搭建吧!

java环境配置

Kafka运行需要zookeeper配合,而zookeeper需要运行在JVM上,所以需要安装JDK。Kafka 从2.0.0版本开始就不再支持JDK7及以下版本,实际工作中我们一般安装JDK8+。

在Kafka官网下载页面的更新文档中明确指出:

We have dropped support for Java 7 and removed the previously deprecated Scala producer and consumer.--kakfa 2.0.0

操作系统:CentOS 8.2 64bit 云服务器

我们开始配置java环境。

安装jdk

(1) 去Java Downloads | Oracle中下载JDK的安装文件jdk-8u341-linux-x64.tar.gz。
到Oracle官网上Java Downloads中下载JDK的安装文件jdk-8u361-linux-x64.tar.gz,上传到centos上,接着执行本地安装jdk。



(2) 新建/home/java文件夹,将jdk-8u361-linux-x64.tar.gz放到该文件夹下,并切换到/home/java目录下。

(3) 执行命令tar -zxvf jdk-8u361-linux-x64.tar.gz进行解压,解压后/home/java目录下多了jdk1.8.0_361文件夹。


通过以上步骤,JDK安装完毕。

Notes:
a.下载JDK,Oracle官方要求登录,没有账户的请按照说明注册即可。
b.安装jdk有两种方式,一种方式安装oracle jdk得下载安装包,第二种是安装openjdk。OpenJDK只包含最精简的JDK,建议使用第一种方式安装。

配置环境变量

(1) 在/etc/profile.d目录下增加环境变量脚本文件配置JDK环境变量,/etc/profile在每次启动时会执行/etc/profile.d下全部的脚本文件。

/etc/profile.d目录新建my_env.sh文件,并编辑如下内容:

#JAVA_HOME
export JAVA_HOME=/home/java/jdk1.8.0_361/
export PATH=$PATH:$JAVA_HOME/bin
export CALSSPATH=$CLASSPATH:$JAVA_HOME/lib
 
#JRE_HOME
export JRE_HOME=/home/java/jdk1.8.0_361/jre/
export PATH=$PATH:$JRE_HOME/bin
export CALSSPATH=$CLASSPATH:$JRE_HOME/lib

(2) 执行如下命令,重载配置文件,让profile文件立即生效

[root@hecs-275297 ~]# source /etc/profile

命令测试

(1) 使用javac命令,不会出现command not found错误
(2) 使用java -version,出现版本为java version "1.8.0_361"

[root@hecs-275297 ~]# java -version
java version "1.8.0_361"
Java(TM) SE Runtime Environment (build 1.8.0_361-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.361-b09, mixed mode)

我们能看到jdk版本已经生效,如果jdk没有安装成功,将会提示-bash: java: command not found

(3) 查看配置是否都正确,echo $JAVA_HOME,echo $CLASSPATH,echo $PATH

[root@hecs-275297 ~]# echo $PATH
/home/java/jdk1.8.0_361//bin:/home/java/jdk1.8.0_361//bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
[root@hecs-275297 ~]# echo $CLASSPATH
.:/home/java/jdk1.8.0_361//lib:/home/java/jdk1.8.0_361//jre/lib
[root@hecs-275297 ~]# echo $PATH
/home/java/jdk1.8.0_361//bin:/home/java/jdk1.8.0_361//bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
[root@hecs-275297 ~]#

特别说明

Centos7系统中配置JDK环境变量的两种方式:

  • 修改/etc/profile文件配置JDK环境变量
  • /etc/profile.d目录下增加环境变量脚本文件,配置JDK环境变量

/etc/profile文件中,也给出了如下说明:

# It's NOT a good idea to change this file unless you know what you
# are doing. It's much better to create a custom.sh shell script in
# /etc/profile.d/ to make custom changes to your environment, as this
# will prevent the need for merging in future updates.

不建议在/etc/profile文件中设置系统环境变量,/etc/profile.d/etc/profile更好管理。

安装kafka

安装指引

(1) 在home目录下新建kafka文件夹,使用wget命令,远程下载kafka_2.13-2.8.2.tgz安装文件

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz  --no-check-certificate


此时,在/home/kafka目录下就会出现名为kafka_2.13-2.8.2.tgz的压缩文件。

Notes: 这里选择国内的清华源,下载速度更快。

(2) 解压文件

进入/home/kafka,将压缩文件解压到当前目录

[root@hecs-275297 ~]# cd /home/kafka/
[root@hecs-275297 kafka]# tar -zxvf kafka_2.13-2.8.2.tgz
kafka_2.13-2.8.2/
kafka_2.13-2.8.2/LICENSE
kafka_2.13-2.8.2/NOTICE
kafka_2.13-2.8.2/bin/
kafka_2.13-2.8.2/bin/kafka-delete-records.sh
...

(3) 修改配置文件

/home/kafka/kafka_2.13-2.8.2/config/目录下有个server.properties文件。
修改其中的配置:

broker.id=0
log.dir=log.dirs=/tmp/kafka-logs
#配置zookeeper管理kafka的路径
zookeeper.connect=localhost:2181
#配置kafka的监听端口
listeners=PLAINTEXT://:9092
#把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP
advertised.listeners=PLAINTEXT://113.116.211.38:9092

Notes: advertised.listeners 以自己的服务器IP地址为准。

(4) 启动

这里采用kafka单机部署模式,修改完上述配置参数之后就可以启动服务。

先启动zookeeper,命令如下:

[root@hecs-275297 ~]# cd /home/kafka/kafka_2.13-2.8.2/
[root@hecs-275297 kafka_2.13-2.8.2]# bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/zookeeper-server-start.sh 

再启动kafka,命令如下:

[root@hecs-275297 kafka_2.13-2.8.2]# bin/kafka-server-start.sh -daemon config/server.properties

测试

通过上面的步骤,我们已经启动了zookeeper和kafka,那么他可以访问了吗?

(1) 创建有测试一个kafka topic

进入到kafka目录下,接着通过下面命令创建一个kafka topic。

[root@hecs-275297 ~]# cd /home/kafka/kafka_2.13-2.8.2/
[root@hecs-275297 kafka_2.13-2.8.2]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

查看创建的topic,可以使用以下命令进行查看

[root@hecs-275297 kafka_2.13-2.8.2]# bin/kafka-topics.sh --list --zookeeper localhost:2181
testTopic

(2) 使用jps命令查看Kafka服务进程是否已经启动,如下:

[root@hecs-275297 kafka_2.13-2.8.2]# jps -l
3945850 kafka.Kafka
1225 org.tanukisoftware.wrapper.WrapperSimpleApp
3945361 org.apache.zookeeper.server.quorum.QuorumPeerMain
3966082 sun.tools.jps.Jps
[root@hecs-275297 kafka_2.13-2.8.2]#

安装Kafka客户端工具

Kafka的应用非常广泛,在实际工作中,往往会查看推送的消息内容,方便排查问题,安装一个kafka可视化工具显得尤为重要,而kafka Tool 就是这样的利器。

下载kafkatool可以前往官网:https://www.kafkatool.com/download.html



笔者的开发机是Windows 10/11,所以这里选择Windows 64-Bit下载。 选择对应版本,根据提示安装即可。

安装之后打开客户端,新建kafka连接

  • 填写连接名称和服务器IP地址
  • 填写kafka服务器IP+端口


填好之后,保存信息,并连接。连接成功效果如下:



此时,在客户端我们看到了之前在服kafka服务端新建的topic——testTopic

至此,kafka服务端和客户端的搭建就完成了。

Notes:
a. kafka的服务器IP地址,要以你自己的服务器地址为准
b. 远程连接kafka的服务器的端口,需要开通端口访问权限。
云服务器可以参考《阿里云添加安全组规则》

使用supervisor管理kafka服务

Supervisor是什么

Supervisor是一个Python写的进程管理工具,可以方便用于启动、重启、关闭进程。特别适合需要常驻内存的进程。

Supervisor相关命令:

# 启动supervisord
supervisord -c /etc/supervisord.conf
supervisorctl -c /etc/supervisord.conf

# 停止supervisord
supervisorctl shutdown

# 重新载入配置
supervisorctl reload

# 查看程序状态
supervisorctl status 

# 查看服务器进程
ps -ef | grep supervisord

centos 安装Supervisor

(1) 使用yum安装Supervisor

yum install -y supervisor


安装好后在/etc/会生成一个supervisord.conf文件及一个supervisord.d文件目录

(2) 查看supervisor是否安装成功

[root@hecs-275297 /]# supervisord --version
4.2.2

(3) 启动

[root@hecs-275297 ~]# supervisord -c /etc/supervisord.conf

查看supervisor是否启动成功

[root@hecs-275297 ~]# ps -ef|grep supervisord
root     4031860       1  0 21:05 ?        00:00:00 /usr/bin/python3.6 /usr/bin/supervisord -c /etc/supervisord.conf
root     4033110 4030772  0 21:08 pts/0    00:00:00 grep --color=auto supervisord
[root@hecs-275297 ~]#

(3) 设置supervisor 开机启动

[root@hecs-275297 ~]# systemctl enable supervisord
Created symlink /etc/systemd/system/multi-user.target.wants/supervisord.service /usr/lib/systemd/system/supervisord.service.
[root@hecs-275297 ~]#

检查是否是开机启动

[root@hecs-275297 ~]# systemctl is-enabled supervisord
enabled

(4) 查看服务状态

[root@hecs-275297 ~]# systemctl status supervisord.service
supervisord.service - Process Monitoring and Control Daemon
   Loaded: loaded (/usr/lib/systemd/system/supervisord.serviceenabledvendor presetdisabled)
   Activeactive (runningsince Sat 2023-02-11 21:28:42 CST; 48s ago
  Process: 4040629 ExecStart=/usr/bin/supervisord -c /etc/supervisord.conf (code=exitedstatus=0/SUCCESS)
 Main PID: 4040632 (supervisord)
    Tasks: 1 (limit: 23712)
   Memory: 16.3M
   CGroup: /system.slice/supervisord.service
           └─4040632 /usr/bin/python3.6 /usr/bin/supervisord -c /etc/supervisord.conf

Feb 11 21:28:42 hecs-275297 systemd[1]: Starting Process Monitoring and Control Daemon...
Feb 11 21:28:42 hecs-275297 systemd[1]: Started Process Monitoring and Control Daemon.
[root@hecs-275297 ~]#

(5) 配置supervisor,web管理页面
修改配置信息,supervisor 默认配置文件,放在 /etc/supervisord.conf 路径中:

[inet_http_server]         ; HTTP 服务器,提供 web 管理界面
port=*:9001                ; Web 管理后台运行的 IP 和端口
username=admin ; 登录管理后台的用户名
password=123               ; 登录管理后台的密码
[include]                     
files = supervisord.d/*.conf ;配置文件夹

修改完之后重启:

[root@hecs-275297 ~]# supervisorctl reload
Restarted supervisord

Notes: 为了演示,这的账户和密码设置很简单,实际在配置的时候,建议设置复杂密码。

在浏览器访问:http://ip:9001



输入刚才在supervisord.conf文件中设置的账户和密码。

这个时候我们可以通过9001端口访问下这个页面,就能看到一个没有任务列表的的页面



至此,supervisor安装完毕!

supervisor管理kafka服务

万事具备,我们终于可以将zookeeper和kafka服务进程加入到supervisor中管理起来了。
我们先在/etc/目录下新建一个supervisor文件夹,用来存放我们自定义conf文件。
并修改/etc/supervisord.conf文件

[include]
files = supervisor/*.conf

(1) 新建一个zookeeper.conf文件

进入/etc/supervisor/目录,新建一个zookeeper.conf文件,添加内容如下:

[program:zookeeper]
command=/home/kafka/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh //home/kafka/kafka_2.13-2.8.2/config/zookeeper.properties
user=root
autostart=true
autorestart=true
startsecs=10
# 日志信息
log_stdout=true
log_stderr=true
stderr_logfile=/var/log/data/supervisor_zookeeper.err.log
stdout_logfile=/var/log/data/supervisor_zookeeper.out.log
redirect_stderr=true
environment=JAVA_HOME=/home/java/jdk1.8.0_361/

(2) 新建一个kafka.conf文件

进入/etc/supervisor/目录,新建一个kafka.conf文件,添加内容如下:

[program:kafka]
command=/home/kafka/kafka_2.13-2.8.2/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.2/config/server.properties
user=root
autostart=true
autorestart=true
startsecs=10
# 日志信息
log_stdout=true
log_stderr=true
stderr_logfile=/var/log/data/supervisor_kafka.err.log
stdout_logfile=/var/log/data/supervisor_kafka.out.log
redirect_stderr=true
environment=JAVA_HOME=/home/java/jdk1.8.0_361/

修改完之后重启:

[root@hecs-275297 ~]# supervisorctl reload
Restarted supervisord

此时,刷新浏览器http://ip:9001



同样,我们supervisor命令也可以检查管理的进程

[root@hecs-275297 ~]# supervisorctl status
kafka RUNNING pid 4127959, uptime 0:34:11
zookeeper RUNNING pid 4125713, uptime 0:35:21
[root@hecs-275297 ~]#

在使用kafka客户端工具连接一下是否正常


一切都和我们预想的一样,有了supervisor来管理,zookeeper和kafka的启停将变得更加方便。

注意点:

如果在上述步骤中遇到

/home/kafka_2.11-2.0.0/bin/kafka-run-class.sh: line 306: /home/java/jdk1.8.0_131//bin/java: Permission denied

类似的提示。

这是linux安装jdk环境变量权限不够的问题。

执行chmod +x /home/java/jdk1.8.0_131/bin/java 提升权限。(jdk路径以实际为准)

命令行工具模拟生产者消费者

模拟生产者

./kafka-console-producer.sh --broker-list <Kafka服务器地址>:<端口号> --topic <主题名>

执行该命令后,将会进入生产者终端窗口,你可以在该窗口中输入消息并按回车键发送。发送的消息将被追加到指定的主题中。

我们在生产者终端窗口输入:"hello,world!","这里是成都"

模拟消费者

bin/kafka-console-consumer.sh --bootstrap-server <Kafka服务器地址>:<端口号> --topic <主题名> --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server localhost:19099 --topic testTopic --from-beginning

执行该命令后,将会进入消费者终端窗口,你可以在该窗口中读取并显示指定主题中的消息。该命令将从主题的起始处开始读取消息。



常见的kafka操作命令行

##启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
bin/kafka-server-start.sh -daemon config/server.properties 

#
# 停服
bin/kafka-server-stop.sh 
bin/zookeeper-server-stop.sh 


#
# 查看topics
 bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

#
#查看某个topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic <主题名>

#
#创建某个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <主题名>

kafka删除topic命令
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic <主题名>

kafka-topics.sh --zookeeper <zookeeper-quorum> --delete --topic <topic-name>

kafka-topics.sh --zookeeper localhost:2181 --delete --topic <topic-name>

#
查看消费者ConsumerGroup列表
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka服务器地址>:<端口号> --list

#
# 模拟生产者操作
bin/kafka-console-producer.sh --broker-list <Kafka服务器地址>:<端口号> --topic <主题名>

#
# 模拟消费者操作
bin/kafka-console-consumer.sh --bootstrap-server <Kafka服务器地址>:<端口号> --topic <主题名> --from-beginning

结语

本文主要是对Centos环境下kafka的安装和管理进行了详细介绍,相信通过这篇文章能够给新手们一个好的指引,帮助大家少走弯路。

kafka在实际工作中的应用非常广泛,比如IM聊天和大数据方面都有应用。

感谢您的阅读。

选择并聚焦到一点,把事情做到极致。

参考

版权声明: 本文为智客工坊「楠木大叔」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。