update AddIndex not return error when fail (#5372)

Resolves: #5332 

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5399/head
Cai Yudong 2021-05-25 11:42:23 +08:00 committed by GitHub
parent 4caf3051dd
commit 4058350e30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 15 deletions

View File

@ -94,6 +94,7 @@ func TestGrpcService(t *testing.T) {
collName2 = "testColl-again" collName2 = "testColl-again"
partName = "testPartition" partName = "testPartition"
fieldName = "vector" fieldName = "vector"
fieldID = 100
segID = 1001 segID = 1001
) )
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
@ -288,7 +289,7 @@ func TestGrpcService(t *testing.T) {
AutoID: true, AutoID: true,
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
{ {
FieldID: 100, FieldID: fieldID,
Name: fieldName, Name: fieldName,
IsPrimaryKey: false, IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector, DataType: schemapb.DataType_FloatVector,
@ -313,7 +314,7 @@ func TestGrpcService(t *testing.T) {
Timestamp: 100, Timestamp: 100,
SourceID: 100, SourceID: 100,
}, },
DbName: "testDb", DbName: dbName,
CollectionName: collName, CollectionName: collName,
Schema: sbf, Schema: sbf,
} }
@ -539,6 +540,14 @@ func TestGrpcService(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(part.SegmentIDs)) assert.Equal(t, 1, len(part.SegmentIDs))
// send msg twice, partition still contains 1 segment
segInfoMsgPack1 := GenSegInfoMsgPack(seg)
SegmentInfoChan <- segInfoMsgPack1
time.Sleep(time.Millisecond * 100)
part1, err := core.MetaTable.GetPartitionByID(1, partID, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(part1.SegmentIDs))
req := &milvuspb.ShowSegmentsRequest{ req := &milvuspb.ShowSegmentsRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowSegments, MsgType: commonpb.MsgType_ShowSegments,
@ -656,6 +665,16 @@ func TestGrpcService(t *testing.T) {
flushedSegMsgPack := GenFlushedSegMsgPack(segID) flushedSegMsgPack := GenFlushedSegMsgPack(segID)
FlushedSegmentChan <- flushedSegMsgPack FlushedSegmentChan <- flushedSegMsgPack
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
segIdxInfo, err := core.MetaTable.GetSegmentIndexInfoByID(segID, -1, "")
assert.Nil(t, err)
// send msg twice, segIdxInfo should not change
flushedSegMsgPack1 := GenFlushedSegMsgPack(segID)
FlushedSegmentChan <- flushedSegMsgPack1
time.Sleep(time.Millisecond * 100)
segIdxInfo1, err := core.MetaTable.GetSegmentIndexInfoByID(segID, -1, "")
assert.Nil(t, err)
assert.Equal(t, segIdxInfo, segIdxInfo1)
req := &milvuspb.DescribeIndexRequest{ req := &milvuspb.DescribeIndexRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{

View File

@ -328,18 +328,18 @@ func (c *Core) startDataServiceSegmentLoop() {
segInfos = append(segInfos, segInfoMsg.Segment) segInfos = append(segInfos, segInfoMsg.Segment)
} }
if len(segInfos) > 0 { if len(segInfos) > 0 {
startPosByte, err := json.Marshal(segMsg.StartPositions) startPosStr, err := EncodeMsgPositions(segMsg.StartPositions)
if err != nil { if err != nil {
log.Error("json.Marshal fail", zap.String("err", err.Error())) log.Error("encode msg start positions fail", zap.String("err", err.Error()))
continue continue
} }
endPosByte, err := json.Marshal(segMsg.EndPositions) endPosStr, err := EncodeMsgPositions(segMsg.EndPositions)
if err != nil { if err != nil {
log.Error("json.Marshal fail", zap.String("err", err.Error())) log.Error("encode msg end positions fail", zap.String("err", err.Error()))
continue continue
} }
if _, err := c.MetaTable.AddSegment(segInfos, string(startPosByte), string(endPosByte)); err != nil { if _, err := c.MetaTable.AddSegment(segInfos, startPosStr, endPosStr); err != nil {
//what if master add segment failed, but data service success? //what if master add segment failed, but data service success?
log.Debug("add segment info meta table failed ", zap.String("error", err.Error())) log.Debug("add segment info meta table failed ", zap.String("error", err.Error()))
continue continue
@ -362,14 +362,14 @@ func (c *Core) startDataNodeFlushedSegmentLoop() {
return return
} }
startPosByte, err := json.Marshal(segMsg.StartPositions) startPosStr, err := EncodeMsgPositions(segMsg.StartPositions)
if err != nil { if err != nil {
log.Error("json.Marshal fail", zap.String("err", err.Error())) log.Error("encode msg start positions fail", zap.String("err", err.Error()))
continue continue
} }
endPosByte, err := json.Marshal(segMsg.EndPositions) endPosStr, err := EncodeMsgPositions(segMsg.EndPositions)
if err != nil { if err != nil {
log.Error("json.Marshal fail", zap.String("err", err.Error())) log.Error("encode msg end positions fail", zap.String("err", err.Error()))
continue continue
} }
@ -424,13 +424,15 @@ func (c *Core) startDataNodeFlushedSegmentLoop() {
} }
} }
_, err = c.MetaTable.AddIndex(segIdxInfos, string(startPosByte), string(endPosByte)) if len(segIdxInfos) > 0 {
_, err = c.MetaTable.AddIndex(segIdxInfos, startPosStr, endPosStr)
if err != nil { if err != nil {
log.Error("AddIndex fail", zap.String("err", err.Error())) log.Error("AddIndex fail", zap.String("err", err.Error()))
} }
} }
} }
} }
}
func (c *Core) tsLoop() { func (c *Core) tsLoop() {
tsoTicker := time.NewTicker(tso.UpdateTimestampStep) tsoTicker := time.NewTicker(tso.UpdateTimestampStep)

View File

@ -787,7 +787,7 @@ func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos st
if ok { if ok {
if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
return 0, nil continue
} }
return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
} }

View File

@ -16,6 +16,7 @@ import (
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
@ -93,3 +94,15 @@ func SegmentIndexInfoEqual(info1 *etcdpb.SegmentIndexInfo, info2 *etcdpb.Segment
info1.BuildID == info2.BuildID && info1.BuildID == info2.BuildID &&
info1.EnableIndex == info2.EnableIndex info1.EnableIndex == info2.EnableIndex
} }
// EncodeMsgPositions serialize []*MsgPosition into string
func EncodeMsgPositions(msgPositions []*msgstream.MsgPosition) (string, error) {
if len(msgPositions) == 0 {
return "", nil
}
resByte, err := json.Marshal(msgPositions)
if err != nil {
return "", err
}
return string(resByte), nil
}