kafka介绍和常见操作
# 1.应用场景方面
RabbitMQ:用于实时的,对可靠性要求较高的消息传递上。
kafka:用于处于活跃的流式数据,大数据量的数据处理上。
# 2.架构模型方面
producer,broker,consumer
RabbitMQ:以broker为中心,有消息的确认机制
kafka:以consumer为中心,无消息的确认机制
# 3.吞吐量方面
RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。
kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。
# 4.集群负载均衡方面
RabbitMQ:本身不支持负载均衡,需要loadbalancer的支持
kafka:采用zookeeper对集群中的broker,consumer进行管理,可以注册topic到zookeeper上,通过zookeeper的协调机制,producer保存对应的topic的broker信息,可以随机或者轮询发送到broker上,producer可以基于语义指定分片,消息发送到broker的某个分片上。
kafka单机版搭建
下载kafka_2.13-2.6.0.tgz安装包
tar -zxf kafka_2.13-2.6.0.tgz
mv kafka_2.13-2.6.0/ kafka
自带的Zookeeper程序脚本与配置文件名与原生Zookeeper稍有不同。kafka自带的Zookeeper程序在bin目录下的zookeeper-server-start.sh脚本进行启动,zookeeper-server-stop.sh脚本进行停止。另外Zookeeper的配制文件在路径config/zookeeper.properties,如果有需要可以修改其中的参数。
首先强调一点,kafka的日志目录和zookeeper数据目录,这两项默认放在tmp目录,而tmp目录中内容会随重启而丢失,所以我们遇到的时候最好自定义一个路径。 zookeeper.properties 配置为
dataDir=/data/kafka/data/kfkzookeeper clientPort=2181 admin.enableServer=false tickTime=2000 initLimit=10 syncLimit=5 server.1=172.168.199.223:2888:3888
zookeeper配置myid文件 三台服务器都要在其zookeeper数据目录dataDir下创建一个myid文件,文件内只需填入上述配置文件中broker id的值,作为集群识别标识。以其中一台192.168.11.21服务器为例:
[root@host-192-168-11-21 ~]# cd /data/kafka/data/kfkzookeeper [root@host-192-168-11-21 kfkzookeeper]# echo 1 > myid 创建好后,查看一下,没问题,myid里面只有一个数值
kafka配置文件
进入kafka/config目录下,参考如下修改server.properties文件
broker.id=1 delete.topic.enable=true num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data/kafka num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.168.199.223:2181 zookeeper.connection.timeout.ms=60000 group.initial.rebalance.delay.ms=0
启动zookeeper服务
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
查看java项目进程
jps
# 5.kafka常见操作
启动Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
关闭 Zookeeper
bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
关闭kafka
bin/kafka-server-stop.sh config/server.properties
创建topic
--partitions指定分区数 --replication-factor 指定分区数的副本
bin/kafka-topics.sh --create --zookeeper 192.168.11.21:2181 --replication-factor 3 --partitions 3 --topic test
./kafka-topics.sh --create --zookeeper 172.168.199.223:2181 --replication-factor 1 --partitions 1 --topic fgbp-log-pro
查看topic列表
bin/kafka-topics.sh --list --zookeeper 192.168.11.21:2181
查看topic详情
bin/kafka-topics.sh --zookeeper 192.168.11.21:2181 --describe --topic test
创建生产者,在一台服务器
bin/kafka-console-producer.sh --broker-list 192.168.11.21:9092 --topic test
创建消费者,在另一台服务器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.22:9092 --topic test
删除topic
bin/kafka-topics.sh --zookeeper 192.168.11.22:2181 --delete --topic test
查看kafka topic数据内容
kafka-console-consumer.sh --bootstrap-server kafka-node-0:9093 kafka-node-1:9094 kafka-node-2:9095 --from-beginning --topic gaoyingfutures_bars_live