前言

Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

安装kafka

我们使用的kafka的版本为

kafka版本
kafka_2.11-1.0.0

我们将kafka解压完成后,和安装其他组件一样,都要配置环境变量.我这里是把kafka解压在/opt/kafka这个目录下,所以配置环境变量如图所示

当我们的环境变量配置完成后,我们就可以开始修改kafka的配置文件了.

修改配置文件

进入kafka的config目录下,我们可以看到许多配置文件,我们需要修改的是server.properties 这个文件

vim server.properties 

在这个文件中修改,找到这几项,把这几项中的ip地址改为自己集群的IP地址

broker.id=0
listeners=PLAINTEXT://192.168.1.130:9092
advertised.listeners=PLAINTEXT://192.168.1.130:9092
zookeeper.connect=192.168.1.130:2181,192.168.1.131:2181,192.168.1.132:2181

# 在另外两个节点上,对server.properties要有几处修改
# broker.id 分别修改成: 1 和 2
# listeners 在ip那里分别修改成子节点对应的,即 PLAINTEXT://192.168.1.131:9092 和 PLAINTEXT://192.168.1.132:9092
# advertised.listeners 也在ip那里分别修改成子节点对应的,即 PLAINTEXT://192.168.1.131:9092 和 PLAINTEXT://192.168.1.132:9092
# zookeeper.connect 不需要修改

修改完成后,把kafka文件夹发送给其他两台集群机器

scp -r kafka slave1:/opt/
scp -r kafka slave2:/opt/

发送成功后,

记得在其他两台机器上配置好环境变量并保存

测试kafka

kafka在启动之前一定要启动zookpeeper,否则启动kafka会报错.

启动zookpeeper的命令这里不多介绍,启动zookpeeper成功后,我们进入kafka目录下

cd /opt/kafka/kafka_2.11/

输入

bin/kafka-server-start.sh config/server.properties &

(注 &这个符号不能少)

启动kafka节点

启动后,我们开始测试,在master上输入

kafka-topics.sh --zookeeper 192.168.1.130:2181,192.168.1.131:2181,192.168.1.132:2181 --topic Test --replication-factor 1 --partitions 1 --create

创建主题Test

创建主题Test成功

然后在主节点上输入

kafka-console-producer.sh --broker-list 192.168.1.130:9092,192.168.1.131:9092,192.168.1.132:9092 --topic Test

启动一个生产者

在其他两个节点上输入

kafka-console-consumer.sh --bootstrap-server 192.168.1.131:9092 --topic Test --from-beginning

kafka-console-consumer.sh --bootstrap-server 192.168.1.132:9092 --topic Test --from-beginning

之后在主节点上的命令行上输入一句话,例如:hello,如果成功的话,应该是这样的,如图:

在主节点上输入的,会自动同步在其他两台节点上

这里我们的kafka也就搭建完成了。

关闭kafka方法,在命令行下,按ctrl加c,退出命令行,输入

bin/kafka-server-stop.sh 

出现类似这样的提示后,

按回车。

即可退出

我们可以使用jps查看一下当前进程

如图:

mark

就代表kafka已经关闭了

本文就写到这里,感谢大家的阅读.