mirror of https://github.com/milvus-io/milvus.git
simplify AddIndex, remove collID,PartID from parameters (#6291)
Signed-off-by: yefu.chen <yefu.chen@zilliz.com>pull/6306/head
parent
0c70370374
commit
ca4a562a72
|
@ -624,13 +624,13 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
|
|||
return ts, partID, nil
|
||||
}
|
||||
|
||||
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, collID, partID typeutil.UniqueID) (typeutil.Timestamp, error) {
|
||||
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
collMeta, ok := mt.collID2Meta[collID]
|
||||
collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("collection id = %d not found", collID)
|
||||
return 0, fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
|
||||
}
|
||||
exist := false
|
||||
for _, fidx := range collMeta.FieldIndexes {
|
||||
|
@ -666,7 +666,7 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, collID, partID ty
|
|||
mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
|
||||
mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true
|
||||
|
||||
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID)
|
||||
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
|
||||
v := proto.MarshalTextString(segIdxInfo)
|
||||
|
||||
ts, err := mt.client.Save(k, v)
|
||||
|
|
|
@ -278,20 +278,22 @@ func TestMetaTable(t *testing.T) {
|
|||
|
||||
t.Run("add segment index", func(t *testing.T) {
|
||||
segIdxInfo := pb.SegmentIndexInfo{
|
||||
SegmentID: segID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
SegmentID: segID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
}
|
||||
_, err := mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err := mt.AddIndex(&segIdxInfo)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// it's legal to add index twice
|
||||
_, err = mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err = mt.AddIndex(&segIdxInfo)
|
||||
assert.Nil(t, err)
|
||||
|
||||
segIdxInfo.BuildID = 202
|
||||
_, err = mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID))
|
||||
})
|
||||
|
@ -615,17 +617,19 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
segIdxInfo := pb.SegmentIndexInfo{
|
||||
SegmentID: segID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID2,
|
||||
BuildID: buildID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
SegmentID: segID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID2,
|
||||
BuildID: buildID,
|
||||
}
|
||||
_, err = mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID))
|
||||
|
||||
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
|
||||
_, err = mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID))
|
||||
|
||||
|
@ -641,7 +645,7 @@ func TestMetaTable(t *testing.T) {
|
|||
mockKV.save = func(key, value string) (typeutil.Timestamp, error) {
|
||||
return 0, fmt.Errorf("save error")
|
||||
}
|
||||
_, err = mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err = mt.AddIndex(&segIdxInfo)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "save error")
|
||||
})
|
||||
|
@ -734,12 +738,14 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Equal(t, false, seg.EnableIndex)
|
||||
|
||||
segIdxInfo := pb.SegmentIndexInfo{
|
||||
SegmentID: segID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
SegmentID: segID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
}
|
||||
_, err = mt.AddIndex(&segIdxInfo, collID, partID)
|
||||
_, err = mt.AddIndex(&segIdxInfo)
|
||||
assert.Nil(t, err)
|
||||
idx, err := mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, idxInfo[0].IndexName)
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -330,10 +330,12 @@ func (c *Core) checkFlushedSegmentsLoop() {
|
|||
continue
|
||||
}
|
||||
info := etcdpb.SegmentIndexInfo{
|
||||
SegmentID: segID,
|
||||
FieldID: idxInfo.FiledID,
|
||||
IndexID: idxInfo.IndexID,
|
||||
EnableIndex: false,
|
||||
CollectionID: collMeta.ID,
|
||||
PartitionID: partID,
|
||||
SegmentID: segID,
|
||||
FieldID: idxInfo.FiledID,
|
||||
IndexID: idxInfo.IndexID,
|
||||
EnableIndex: false,
|
||||
}
|
||||
log.Debug("build index by background checker",
|
||||
zap.Int64("segment_id", segID),
|
||||
|
@ -350,7 +352,7 @@ func (c *Core) checkFlushedSegmentsLoop() {
|
|||
if info.BuildID != 0 {
|
||||
info.EnableIndex = true
|
||||
}
|
||||
if _, err := c.MetaTable.AddIndex(&info, collMeta.ID, partID); err != nil {
|
||||
if _, err := c.MetaTable.AddIndex(&info); err != nil {
|
||||
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -1886,10 +1888,12 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
|
|||
}
|
||||
|
||||
info := etcdpb.SegmentIndexInfo{
|
||||
SegmentID: segID,
|
||||
FieldID: fieldSch.FieldID,
|
||||
IndexID: idxInfo.IndexID,
|
||||
EnableIndex: false,
|
||||
CollectionID: in.Segment.CollectionID,
|
||||
PartitionID: in.Segment.PartitionID,
|
||||
SegmentID: segID,
|
||||
FieldID: fieldSch.FieldID,
|
||||
IndexID: idxInfo.IndexID,
|
||||
EnableIndex: false,
|
||||
}
|
||||
info.BuildID, err = c.BuildIndex(ctx, segID, fieldSch, idxInfo, true)
|
||||
if err == nil && info.BuildID != 0 {
|
||||
|
@ -1898,7 +1902,7 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
|
|||
log.Error("build index fail", zap.Int64("buildid", info.BuildID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
_, err = c.MetaTable.AddIndex(&info, in.Segment.CollectionID, in.Segment.PartitionID)
|
||||
_, err = c.MetaTable.AddIndex(&info)
|
||||
if err != nil {
|
||||
log.Error("AddIndex fail", zap.String("err", err.Error()))
|
||||
}
|
||||
|
|
|
@ -743,8 +743,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
|
|||
if info.BuildID != 0 {
|
||||
info.EnableIndex = true
|
||||
}
|
||||
partID := segID2PartID[segID]
|
||||
if _, err := t.core.MetaTable.AddIndex(&info, collMeta.ID, partID); err != nil {
|
||||
if _, err := t.core.MetaTable.AddIndex(&info); err != nil {
|
||||
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue