diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index 6254ad30b0..81a4ed4d54 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -12,17 +12,18 @@ import ( ) type Consumer struct { - c *kafka.Consumer - config *kafka.ConfigMap - msgChannel chan mqwrapper.Message - hasSeek bool - hasConsume bool - skipMsg bool - topic string - groupID string - closeCh chan struct{} - chanOnce sync.Once - closeOnce sync.Once + c *kafka.Consumer + config *kafka.ConfigMap + msgChannel chan mqwrapper.Message + hasSeek bool + hasConsume bool + skipMsg bool + latestMsgOffset kafka.Offset + topic string + groupID string + closeCh chan struct{} + chanOnce sync.Once + closeOnce sync.Once } func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) (*Consumer, error) { @@ -48,6 +49,25 @@ func (kc *Consumer) createKafkaConsumer() error { log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err)) return err } + + latestMsgID, err := kc.GetLatestMsgID() + if err != nil { + switch v := err.(type) { + case kafka.Error: + if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart { + log.Warn("get latest msg ID failed, topic or partition does not exists!", + zap.String("topic", kc.topic), + zap.String("err msg", v.String())) + kc.latestMsgOffset = kafka.OffsetBeginning + } + default: + log.Error("get latest msg ID failed", zap.String("topic", kc.topic), zap.Error(err)) + return err + } + } else { + kc.latestMsgOffset = kafka.Offset(latestMsgID.(*kafkaID).messageID) + } + return nil } @@ -77,13 +97,35 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message { // we assume that case is Chan starting before producing message with auto create topic config, // consuming messages will fail that error is 'Subscribed topic not available' // if invoke Subscribe method of kafka, so we use Assign instead of Subscribe. - tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}} - if err := kc.c.Assign(tps); err != nil { - log.Error("kafka consumer subscribe failed ", zap.String("topic name", kc.topic), zap.Error(err)) - panic(err) + var tps []kafka.TopicPartition + if offset == kafka.OffsetEnd && kc.latestMsgOffset != kafka.OffsetBeginning { + // kafka consumer will start when assign invoked, in order to guarantee the latest message + // position is same with created consumer time, there will use a seek to the latest to + // replace consuming from the latest position. + if err := kc.internalSeek(kc.latestMsgOffset, false); err != nil { + log.Error("kafka consumer subscribe failed ", + zap.String("topic name", kc.topic), + zap.Any("latestMsgOffset", kc.latestMsgOffset), + zap.Any("offset", offset), + zap.Error(err)) + panic(err) + } + } else { + tps = []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}} + if err := kc.c.Assign(tps); err != nil { + log.Error("kafka consumer subscribe failed ", + zap.String("topic name", kc.topic), + zap.Any("latestMsgOffset", kc.latestMsgOffset), + zap.Any("offset", offset), + zap.Error(err)) + panic(err) + } } - log.Debug("starting kafka consume", zap.String("topic name", kc.topic), zap.Any("offset", offset)) + log.Debug("starting kafka consume", + zap.String("topic name", kc.topic), + zap.Any("latestMsgOffset", kc.latestMsgOffset), + zap.Any("offset", offset)) } go func() { @@ -117,6 +159,11 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message { } func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { + offset := kafka.Offset(id.(*kafkaID).messageID) + return kc.internalSeek(offset, inclusive) +} + +func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error { if kc.hasSeek { return errors.New("unsupported multiple seek with the same kafka consumer") } @@ -125,11 +172,10 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { return errors.New("unsupported seek after consume message with the same kafka consumer") } - start := time.Now() - offset := kafka.Offset(id.(*kafkaID).messageID) log.Debug("kafka consumer seek start", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive)) + start := time.Now() err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}) if err != nil { log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err)) @@ -151,13 +197,11 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { Offset: offset}, 1000); err != nil { return err } - - kc.hasSeek = true - cost = time.Since(start).Milliseconds() log.Debug("kafka consumer seek finished", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost)) + kc.hasSeek = true return nil } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index 47213f45c9..159c074bbb 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -61,9 +61,7 @@ func TestKafkaConsumer_GetSeek(t *testing.T) { err = consumer.Seek(msgID, false) assert.Nil(t, err) - assert.Panics(t, func() { - consumer.Seek(msgID, false) - }) + assert.Error(t, consumer.Seek(msgID, false)) } func TestKafkaConsumer_SeekAfterChan(t *testing.T) { @@ -108,6 +106,29 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { assert.Nil(t, err) } +func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) + topic := fmt.Sprintf("test-topicName-%d", rand.Int()) + + data := []int{111, 222, 333} + testKafkaConsumerProduceData(t, topic, data) + + config := createConfig(groupID) + config.SetKey("auto.offset.reset", "latest") + consumer, err := newKafkaConsumer(config, topic, groupID) + assert.NoError(t, err) + defer consumer.Close() + + data = []int{444, 555} + testKafkaConsumerProduceData(t, topic, data) + + msg := <-consumer.Chan() + assert.Equal(t, 444, BytesToInt(msg.Payload())) + msg = <-consumer.Chan() + assert.Equal(t, 555, BytesToInt(msg.Payload())) +} + func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { ctx := context.Background() kc := createKafkaClient(t)