diff --git a/Makefile b/Makefile index 8dd68fc7e7..2ef0a75942 100644 --- a/Makefile +++ b/Makefile @@ -113,6 +113,10 @@ indexservice: build-cpp @echo "Building distributed indexservice ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexservice $(PWD)/cmd/indexservice/main.go 1>/dev/null +singlenode: build-cpp + @echo "Building singlenode ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/singlenode $(PWD)/cmd/singlenode/main.go 1>/dev/null + # Builds various components locally. build-go: build-cpp diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 2e29edf0e1..4854d14a73 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -11,6 +11,7 @@ import ( "github.com/tecbot/gorocksdb" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" @@ -125,16 +126,19 @@ func (rmq *RocksMQ) CreateChannel(channelName string) error { // Check if channel exist if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) { + log.Debug("RocksMQ: " + beginKey + " or " + endKey + " existed.") return nil } err := rmq.kv.Save(beginKey, "0") if err != nil { + log.Debug("RocksMQ: save " + beginKey + " failed.") return err } err = rmq.kv.Save(endKey, "0") if err != nil { + log.Debug("RocksMQ: save " + endKey + " failed.") return err } @@ -153,11 +157,13 @@ func (rmq *RocksMQ) DestroyChannel(channelName string) error { err := rmq.kv.Remove(beginKey) if err != nil { + log.Debug("RocksMQ: remove " + beginKey + " failed.") return err } err = rmq.kv.Remove(endKey) if err != nil { + log.Debug("RocksMQ: remove " + endKey + " failed.") return err } @@ -167,10 +173,12 @@ func (rmq *RocksMQ) DestroyChannel(channelName string) error { func (rmq *RocksMQ) CreateConsumerGroup(groupName string, channelName string) (*Consumer, error) { key := groupName + "/" + channelName + "/current_id" if rmq.checkKeyExist(key) { + log.Debug("RocksMQ: " + key + " existed.") return nil, fmt.Errorf("ConsumerGroup %s already exists", groupName) } err := rmq.kv.Save(key, DefaultMessageID) if err != nil { + log.Debug("RocksMQ: save " + key + " failed.") return nil, err } @@ -189,6 +197,7 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e err := rmq.kv.Remove(key) if err != nil { + log.Debug("RocksMQ: remove " + key + " failed.") return err } @@ -202,10 +211,12 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) if err != nil { + log.Debug("RocksMQ: alloc id failed.") return err } if UniqueID(msgLen) != idEnd-idStart { + log.Debug("RocksMQ: Obtained id length is not equal that of message") return errors.New("Obtained id length is not equal that of message") } @@ -214,6 +225,7 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { key, err := combKey(channelName, idStart+UniqueID(i)) if err != nil { + log.Debug("RocksMQ: combKey(" + channelName + "," + strconv.FormatInt(idStart+UniqueID(i), 10) + ")") return err } @@ -222,6 +234,7 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch) if err != nil { + log.Debug("RocksMQ: write batch failed") return err } @@ -229,12 +242,14 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro kvChannelBeginID := channelName + "/begin_id" beginIDValue, err := rmq.kv.Load(kvChannelBeginID) if err != nil { + log.Debug("RocksMQ: load " + kvChannelBeginID + " failed") return err } kvValues := make(map[string]string) if beginIDValue == "0" { + log.Debug("RocksMQ: overwrite " + kvChannelBeginID + " with " + strconv.FormatInt(idStart, 10)) kvValues[kvChannelBeginID] = strconv.FormatInt(idStart, 10) } @@ -243,6 +258,7 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro err = rmq.kv.MultiSave(kvValues) if err != nil { + log.Debug("RocksMQ: multisave failed") return err } @@ -260,6 +276,7 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons metaKey := groupName + "/" + channelName + "/current_id" currentID, err := rmq.kv.Load(metaKey) if err != nil { + log.Debug("RocksMQ: load " + metaKey + " failed") return nil, err } @@ -272,6 +289,7 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons fixChanName, err := fixChannelName(channelName) if err != nil { + log.Debug("RocksMQ: fixChannelName " + channelName + " failed") return nil, err } dataKey := fixChanName + "/" + currentID @@ -294,6 +312,7 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons offset++ msgID, err := strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64) if err != nil { + log.Debug("RocksMQ: parse int " + string(key.Data())[FixedChannelNameLen+1:] + " failed") return nil, err } msg := ConsumerMessage{ @@ -305,17 +324,20 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons val.Free() } if err := iter.Err(); err != nil { + log.Debug("RocksMQ: get error from iter.Err()") return nil, err } // When already consume to last mes, an empty slice will be returned if len(consumerMessage) == 0 { + log.Debug("RocksMQ: consumerMessage is empty") return consumerMessage, nil } newID := consumerMessage[len(consumerMessage)-1].MsgID err = rmq.Seek(groupName, channelName, newID) if err != nil { + log.Debug("RocksMQ: Seek(" + groupName + "," + channelName + "," + strconv.FormatInt(newID, 10) + ") failed") return nil, err } @@ -326,22 +348,26 @@ func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID UniqueID) e /* Step I: Check if key exists */ key := groupName + "/" + channelName + "/current_id" if !rmq.checkKeyExist(key) { + log.Debug("RocksMQ: channel " + key + " not exists") return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, channelName) } storeKey, err := combKey(channelName, msgID) if err != nil { + log.Debug("RocksMQ: combKey(" + channelName + "," + strconv.FormatInt(msgID, 10) + ") failed") return err } _, err = rmq.store.Get(gorocksdb.NewDefaultReadOptions(), []byte(storeKey)) if err != nil { + log.Debug("RocksMQ: get " + storeKey + " failed") return err } /* Step II: Save current_id in kv */ err = rmq.kv.Save(key, strconv.FormatInt(msgID, 10)) if err != nil { + log.Debug("RocksMQ: save " + key + " failed") return err }