记录kafka消息重复消费问题

Kafka重复消费问题

问题复现

public class Test extends Thread {
 		/**
         * 初始化Kafka集群信息.
         */
        private Properties config() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "host1:9092,host2:9092,host3:9092");// 指定Kafka集群地址
            props.put("group.id", "myConsumer");// 指定消费者组
            props.put("enable.auto.commit", "false");// 开启自动提交
            props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔
            /** 
            	不能设置太小,否则消费者注册时报错:
            	Heartbeat must be set lower than the session timeout,默认心跳时间3s	
            	同时还需要在group.min.session.timeout.ms 和group.max.session.timeout.ms之间	
            */ 
            props.put("session.timeout.ms", "8000");
            props.put("group.min.session.timeout.ms", "3000");
            props.put("group.max.session.timeout.ms", "20000");
            props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔
            // 反序列化消息主键
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 反序列化消费记录
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             // 反序列化消息主键
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 反序列化消费记录
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return props;
        }
		
        @Override
        public void run() {
        	// topic
        	String topic = "my_kafka_topic";
			//创建一个生产者
			KafkaProducer<String,Object> producer = new KafkaProducer<>(config());
			for(int i=1;i<=3;i++) {
				ProducerRecord producerRecord = new ProducerRecord(topic ,"key-"+i,"value-"+i);
				producer.send(producerRecord);
			}

            // 创建一个消费者实例对象
            KafkaConsumer consumer = new KafkaConsumer(config());
            // 订阅消费主题集合
            consumer.subscribe(Arrays.asList("test_kafka_topic"));
            // 实时消费标识
            while (true) {
                // 获取主题消息数据
                ConsumerRecords records = consumer.poll(10L);
                Iterator iterator = records.iterator();
                while(iterator.hasNext()) {
                	ConsumerRecord record = (ConsumerRecord) iterator.next();
                    // 循环打印消息记录
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    Thread.sleep(360000) //模拟处理六分钟
                    consumer.commitAsync(); //提交
            	}        
            // 出现异常关闭消费者对象
            consumer.close();
        }
}

日志报错

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll was longer than the configured max poll interval ms, which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in pollo with max pollrecords

无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。
这意味着对轮询的后续调用之间的时间长于配置的最大轮询间隔ms,这通常意味着轮询循环在消息处理上花费了太多时间。
您可以通过增加会话超时或通过使用最大pollrecord减少pollo中返回的批的最大大小来解决这个问题

原理解释:https://www.lxlinux.net/6293.html

解决方案

  1. 减少单次拉取消息条数,增加最大拉取间隔时间。
    消费者配置中,减少单次拉取消息条数max.poll.records,增加获取消息后提交偏移量的最大时间max.poll.interval.ms。
    max.poll.records默认较大,容易产生消费积压导致超过设定的时间(默认5分钟),服务端会认为该消费者失效。

  2. 增加超时时间。
    消费者配置中,增加超时时间session.timeout.ms。
    kafka消费者默认3秒发送一次心跳,若服务端在session.timeout.ms内未检测心跳,会认为该消费者失效。

  3. 调整发送心跳时间
    消费者配置中,调整发送心跳时间heartbeat.interval.ms。
    当使用 Kafka 的分组管理功能时,心跳到消费者协调器之间的预计时间。心跳用于确保消费者的会话保持活动状态,当有新消费者加入或离开组时方便重新平衡。该值必须比 session.timeout.ms 小,通常不高于1/3。它可以调整得更低,以控制正常重新平衡的预期时间。