kafka下载:
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzvf kafka_2.12-2.5.0.tgz -C /usr/local/
cd /usr/local/kafka_2.13-2.5.0
cd config
复制zookeeper配置项
cat zookeeper.properties |grep -v '#' >> zk.properties
vim zk.properties
修改dataDir项
dataDir=/usr/local/zookeeper/data
复制kafka配置项
cd /usr/local/kafka_2.13-2.5.0/config
cat server.properties | grep -v '#' >>kafka1.properties
vim kafka1.properties
##如果需要开启远程访问则修改如下信息:
//添加host.name
host.name=192.168.0.30
//此项打开
listeners=PLAINTEXT://:9092
//此项打开并设置如下
advertised.listeners=PLAINTEXT://192.168.0.30:9092
重启动服务,即可通过远程连接到kafka主机。再就是注意防火墙
保存退出
启动zookeeper
./bin/zookeeper-server-start.sh -daemon config/zk.properties
netstat -anp|grep 2128
端口存在 则表示 启动成功
启动kafka
./bin/kafka-server-start.sh -daemon config/kafka1.properties
netstat -anp|grep 9092
端口存在 则表示 启动成功
使用:
//查看所有topic列表
./bin/kafka-topics.sh --list --zookeeper localhost:2181
//创建topic 表示创建名为test的topic, 2个备份,3个分区
//replication-factor
//用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
//partions
//主题分区数
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
//删除名为test的topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
//消息生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
//消息消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
//从最开头消费名为test的topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
//展示名为test的topic的offset信息
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
//显示分组为testgroup的信息
./bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group testgroup
问题:
生产者消费者不能连接远程kafka集群
错误日志出现 连接拒绝:
Connection refused:
修改kafka配置,开启远程访问即可
注意:
同一个topic 高级消费和低级消费不可同时使用,不然会出现offse提交冲突,两个低级消费者可以同时使用
各项参数:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
rdkafka文档:
https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html
消息顺序:
生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的
单消息者多线程处理建议:
一般不建议使用,因为kafka线程是不安全的。多线程处理时不好处理多线程之间的offset确认
如果对消息顺序没有要求,则无所谓。
建议:根据您的要求,您可以使用一种方法来解决上述可能的问题,并采用固定的方法。但是,我建议一个更可靠的解决方案是使用分区暂停/恢复。您的消费者应该以非常抽象的方式执行以下步骤:
1:轮询()以获取消息。
2:暂停所有相应的主题/分区。
3:将消息分配给工作线程并等待其处理。
4:继续调用poll(),但由于分区暂停,因此在使用方保持活动状态时不会收到任何额外的消息。(确保此时未注册新主题)
5:如果所有工作线程都应报告消息处理成功/失败,则相应地提交偏移量。
6:恢复所有分区。
注意:根据您的方案和要求,可以有更好的方法或其他解决方案。这只是一个想法或可能的解决方案之一。
参数项:
避免Rebalance
Rebalance 发生的时机有三个:
1、组成员数量发生变化
2、订阅主题数量发生变化
3、订阅主题的分区数发生变化
指定一个主题的某些分区消费,不会触发rebalance.即指定partition消费者不触发rebalance
非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)
设置 session.timeout.ms = 6s。
设置 heartbeat.interval.ms = 2s。
要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。
max.poll.interval.ms //消费者最大允许执行时间,如果超过此时间间隔,而没有进行下一个poll,则认为该使用者失败了,该组将重新平衡,会产生reblance操作,最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点
PHP使用:
安装扩展:
1、安装librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install
2、安装rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php/bin/php-config ###你安装的php下的php-config路径
make && make install
3、启用扩展
php.ini中添加行 ###在php下面的/etc/php.ini 编辑
extension=rdkafka.so
下载查看 or github url :https://github.com/leohe666/kafka
使用问题参考:
Kafka如何保证消息不丢失不重复
https://www.cnblogs.com/cherish010/p/9764810.html
Kafka丢失数据问题优化总结