milvus/internal/mq/mqimpl/rocksmq/server/rocksmq_reader.go

133 lines
3.3 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"path"
"strconv"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/tecbot/gorocksdb"
)
type rocksmqReader struct {
store *gorocksdb.DB
topic string
readerName string
readOpts *gorocksdb.ReadOptions
iter *rocksdbkv.RocksIterator
currentID UniqueID
messageIDInclusive bool
readerMutex chan struct{}
}
//Seek seek the rocksmq reader to the pointed position
func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet
rr.currentID = msgID
dataKey := path.Join(rr.topic, strconv.FormatInt(msgID, 10))
rr.iter.Seek([]byte(dataKey))
if !rr.messageIDInclusive {
rr.currentID++
rr.iter.Next()
}
}
func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
var err error
iter := rr.iter
var msg *ConsumerMessage
getMsg := func() {
key := iter.Key()
val := iter.Value()
tmpKey := string(key.Data())
if key != nil {
key.Free()
}
var msgID UniqueID
msgID, err = strconv.ParseInt(tmpKey[len(rr.topic)+1:], 10, 64)
msg = &ConsumerMessage{
MsgID: msgID,
}
origData := val.Data()
dataLen := len(origData)
if dataLen > 0 {
msg.Payload = make([]byte, dataLen)
copy(msg.Payload, origData)
}
if val != nil {
val.Free()
}
iter.Next()
rr.currentID = msgID
}
if iter.Valid() {
getMsg()
return msg, err
}
// TODO this is the same logic as pulsar reader, but do we really need to read till the end of the stream
select {
case <-ctx.Done():
log.Debug("Stop get next reader message!")
return nil, ctx.Err()
case _, ok := <-rr.readerMutex:
if !ok {
log.Warn("reader Mutex closed")
return nil, fmt.Errorf("reader Mutex closed")
}
rr.iter.Close()
rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts)
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
iter = rr.iter
iter.Seek([]byte(dataKey))
if !iter.Valid() {
return nil, errors.New("reader iterater is still invalid after receive mutex")
}
getMsg()
return msg, err
}
}
func (rr *rocksmqReader) HasNext() bool {
if rr.iter.Valid() {
return true
}
select {
case _, ok := <-rr.readerMutex:
if !ok {
return false
}
rr.iter.Close()
rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts)
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
rr.iter.Seek([]byte(dataKey))
return rr.iter.Valid()
default:
return false
}
}
func (rr *rocksmqReader) Close() {
close(rr.readerMutex)
rr.iter.Close()
rr.readOpts.Destroy()
}