Remove the etcd deletion of segment data path on querynode (#11863)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/11863/merge
Xiaofan 2021-11-17 12:13:10 +08:00 committed by GitHub
parent a6a7dc9c8f
commit f654621650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 89 additions and 102 deletions

View File

@ -31,15 +31,14 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/mqclient" "github.com/milvus-io/milvus/internal/util/mqclient"
) )
const ( const (
collectionMetaPrefix = "queryCoord-collectionMeta" collectionMetaPrefix = "queryCoord-collectionMeta"
segmentMetaPrefix = "queryCoord-segmentMeta"
queryChannelMetaPrefix = "queryCoord-queryChannel" queryChannelMetaPrefix = "queryCoord-queryChannel"
deltaChannelMetaPrefix = "queryCoord-deltaChannel" deltaChannelMetaPrefix = "queryCoord-deltaChannel"
sealedSegmentChangeInfoPrefix = "queryCoord-sealedSegmentChangeInfo"
globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition" globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition"
) )
@ -164,7 +163,7 @@ func (m *MetaReplica) reloadFromKV() error {
m.collectionInfos[collectionID] = collectionInfo m.collectionInfos[collectionID] = collectionInfo
} }
segmentKeys, segmentValues, err := m.client.LoadWithPrefix(segmentMetaPrefix) segmentKeys, segmentValues, err := m.client.LoadWithPrefix(util.SegmentMetaPrefix)
if err != nil { if err != nil {
return err return err
} }
@ -413,7 +412,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
col2SegmentChangeInfos := make(col2SealedSegmentChangeInfos) col2SegmentChangeInfos := make(col2SealedSegmentChangeInfos)
segmentsCompactionFrom := make([]UniqueID, 0) segmentsCompactionFrom := make([]UniqueID, 0)
// get segmentInfos to sav // get segmentInfos to colSegmentInfos
for collectionID, onlineInfos := range saves { for collectionID, onlineInfos := range saves {
segmentsChangeInfo := &querypb.SealedSegmentsChangeInfo{ segmentsChangeInfo := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
@ -508,7 +507,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
if err != nil { if err != nil {
return col2SegmentChangeInfos, err return col2SegmentChangeInfos, err
} }
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, info.SegmentID)
segmentInfoKvs[segmentKey] = string(segmentInfoBytes) segmentInfoKvs[segmentKey] = string(segmentInfoBytes)
} }
} }
@ -521,7 +520,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
// remove compacted segment info from etcd // remove compacted segment info from etcd
for _, segmentID := range segmentsCompactionFrom { for _, segmentID := range segmentsCompactionFrom {
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID) segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID)
err := m.client.Remove(segmentKey) err := m.client.Remove(segmentKey)
if err != nil { if err != nil {
panic(err) panic(err)
@ -553,7 +552,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
return col2SegmentChangeInfos, err return col2SegmentChangeInfos, err
} }
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, changeInfos.Base.MsgID) changeInfoKey := fmt.Sprintf("%s/%d", util.ChangeInfoMetaPrefix, changeInfos.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes) saveKvs[changeInfoKey] = string(changeInfoBytes)
} }
@ -644,7 +643,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
// remove meta from etcd // remove meta from etcd
for _, info := range removes { for _, info := range removes {
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID) segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, info.SegmentID)
err = m.client.Remove(segmentKey) err = m.client.Remove(segmentKey)
if err != nil { if err != nil {
panic(err) panic(err)
@ -673,7 +672,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
} }
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, segmentChangeInfos.Base.MsgID) changeInfoKey := fmt.Sprintf("%s/%d", util.ChangeInfoMetaPrefix, segmentChangeInfos.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes) saveKvs[changeInfoKey] = string(changeInfoBytes)
err = m.client.MultiSave(saveKvs) err = m.client.MultiSave(saveKvs)
@ -695,6 +694,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil
} }
// send sealed segment change infos into query channels
func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) { func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) {
// get msgStream to produce sealedSegmentChangeInfos to query channel // get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID) queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID)
@ -1179,7 +1179,7 @@ func multiSaveSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo, kv kv
if err != nil { if err != nil {
return err return err
} }
key := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID) key := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID)
kvs[key] = string(infoBytes) kvs[key] = string(infoBytes)
} }
@ -1189,7 +1189,7 @@ func multiSaveSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo, kv kv
func multiRemoveSegmentInfo(segmentIDs []UniqueID, kv kv.MetaKv) error { func multiRemoveSegmentInfo(segmentIDs []UniqueID, kv kv.MetaKv) error {
keys := make([]string, 0) keys := make([]string, 0)
for _, segmentID := range segmentIDs { for _, segmentID := range segmentIDs {
key := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID) key := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID)
keys = append(keys, key) keys = append(keys, key)
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
) )
func successResult() error { return nil } func successResult() error { return nil }
@ -329,7 +330,7 @@ func TestReloadMetaFromKV(t *testing.T) {
} }
segmentBlobs, err := proto.Marshal(segmentInfo) segmentBlobs, err := proto.Marshal(segmentInfo)
assert.Nil(t, err) assert.Nil(t, err)
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, defaultSegmentID) segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, defaultSegmentID)
kvs[segmentKey] = string(segmentBlobs) kvs[segmentKey] = string(segmentBlobs)
queryChannelInfo := &querypb.QueryChannelInfo{ queryChannelInfo := &querypb.QueryChannelInfo{

View File

@ -954,6 +954,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
} }
if err != nil { if err != nil {
log.Error("Failed to update global sealed seg infos, begin to rollback", zap.Error(err))
rollBackSegmentChangeInfoErr := retry.Do(ctx, func() error { rollBackSegmentChangeInfoErr := retry.Do(ctx, func() error {
rollBackChangeInfos := reverseSealedSegmentChangeInfo(sealedSegmentChangeInfos) rollBackChangeInfos := reverseSealedSegmentChangeInfo(sealedSegmentChangeInfos)
for collectionID, infos := range rollBackChangeInfos { for collectionID, infos := range rollBackChangeInfos {
@ -967,6 +968,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
if rollBackSegmentChangeInfoErr != nil { if rollBackSegmentChangeInfoErr != nil {
log.Error("scheduleLoop: Restore the information of global sealed segments in query node failed", zap.Error(rollBackSegmentChangeInfoErr)) log.Error("scheduleLoop: Restore the information of global sealed segments in query node failed", zap.Error(rollBackSegmentChangeInfoErr))
} }
log.Info("Successfully roll back segment info change")
return err return err
} }

View File

@ -558,12 +558,6 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er
partition.removeSegmentID(segmentID) partition.removeSegmentID(segmentID)
delete(colReplica.segments, segmentID) delete(colReplica.segments, segmentID)
deleteSegment(segment) deleteSegment(segment)
key := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID)
err = colReplica.etcdKV.Remove(key)
if err != nil {
log.Warn("error when remove segment info from etcd")
}
return nil return nil
} }

View File

@ -32,18 +32,16 @@ func newGlobalSealedSegmentManager(collectionID UniqueID) *globalSealedSegmentMa
} }
} }
func (g *globalSealedSegmentManager) addGlobalSegmentInfo(segmentInfo *querypb.SegmentInfo) error { func (g *globalSealedSegmentManager) addGlobalSegmentInfo(segmentInfo *querypb.SegmentInfo) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
if segmentInfo.CollectionID != g.collectionID { if segmentInfo.CollectionID != g.collectionID {
log.Debug("mismatch collectionID when addGlobalSegmentInfo", log.Warn("Find mismatch collectionID when addGlobalSegmentInfo",
zap.Any("manager collectionID", g.collectionID), zap.Any("manager collectionID", g.collectionID),
zap.Any("segmentInfo collectionID", segmentInfo.CollectionID), zap.Any("segmentInfo collectionID", segmentInfo.CollectionID),
) )
return nil
} }
g.globalSealedSegments[segmentInfo.SegmentID] = segmentInfo g.globalSealedSegments[segmentInfo.SegmentID] = segmentInfo
return nil
} }
func (g *globalSealedSegmentManager) getGlobalSegmentIDs() []UniqueID { func (g *globalSealedSegmentManager) getGlobalSegmentIDs() []UniqueID {
@ -70,14 +68,14 @@ func (g *globalSealedSegmentManager) getGlobalSegmentIDsByPartitionIds(partition
return resIDs return resIDs
} }
func (g *globalSealedSegmentManager) hasGlobalSegment(segmentID UniqueID) bool { func (g *globalSealedSegmentManager) hasGlobalSealedSegment(segmentID UniqueID) bool {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
_, ok := g.globalSealedSegments[segmentID] _, ok := g.globalSealedSegments[segmentID]
return ok return ok
} }
func (g *globalSealedSegmentManager) removeGlobalSegmentInfo(segmentID UniqueID) { func (g *globalSealedSegmentManager) removeGlobalSealedSegmentInfo(segmentID UniqueID) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
delete(g.globalSealedSegments, segmentID) delete(g.globalSealedSegments, segmentID)

View File

@ -29,13 +29,10 @@ func TestGlobalSealedSegmentManager(t *testing.T) {
PartitionID: defaultPartitionID, PartitionID: defaultPartitionID,
} }
err := manager.addGlobalSegmentInfo(segmentInfo) manager.addGlobalSegmentInfo(segmentInfo)
assert.NoError(t, err)
segmentInfo.CollectionID = 1000 segmentInfo.CollectionID = 1000
err = manager.addGlobalSegmentInfo(segmentInfo) manager.addGlobalSegmentInfo(segmentInfo)
assert.NoError(t, err)
ids := manager.getGlobalSegmentIDs() ids := manager.getGlobalSegmentIDs()
assert.Len(t, ids, 1) assert.Len(t, ids, 1)
assert.Equal(t, segmentInfo.SegmentID, ids[0]) assert.Equal(t, segmentInfo.SegmentID, ids[0])
@ -49,19 +46,17 @@ func TestGlobalSealedSegmentManager(t *testing.T) {
assert.Len(t, ids, 0) assert.Len(t, ids, 0)
segmentInfo.CollectionID = defaultCollectionID segmentInfo.CollectionID = defaultCollectionID
err = manager.addGlobalSegmentInfo(segmentInfo) manager.addGlobalSegmentInfo(segmentInfo)
assert.NoError(t, err)
manager.removeGlobalSegmentInfo(defaultSegmentID) manager.removeGlobalSealedSegmentInfo(defaultSegmentID)
ids = manager.getGlobalSegmentIDs() ids = manager.getGlobalSegmentIDs()
assert.Len(t, ids, 0) assert.Len(t, ids, 0)
has := manager.hasGlobalSegment(defaultSegmentID) has := manager.hasGlobalSealedSegment(defaultSegmentID)
assert.False(t, has) assert.False(t, has)
segmentInfo.CollectionID = defaultCollectionID segmentInfo.CollectionID = defaultCollectionID
err = manager.addGlobalSegmentInfo(segmentInfo) manager.addGlobalSegmentInfo(segmentInfo)
assert.NoError(t, err)
manager.close() manager.close()
ids = manager.getGlobalSegmentIDs() ids = manager.getGlobalSegmentIDs()

View File

@ -28,10 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
) "github.com/milvus-io/milvus/internal/util"
const (
segmentMetaPrefix = "queryCoord-segmentMeta"
) )
// historical is in charge of historical data in query node // historical is in charge of historical data in query node
@ -73,7 +70,7 @@ func (h *historical) close() {
func (h *historical) watchGlobalSegmentMeta() { func (h *historical) watchGlobalSegmentMeta() {
log.Debug("query node watchGlobalSegmentMeta start") log.Debug("query node watchGlobalSegmentMeta start")
watchChan := h.etcdKV.WatchWithPrefix(segmentMetaPrefix) watchChan := h.etcdKV.WatchWithPrefix(util.SegmentMetaPrefix)
for { for {
select { select {

View File

@ -18,9 +18,9 @@ import (
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
"github.com/stretchr/testify/assert"
) )
func TestHistorical_GlobalSealedSegments(t *testing.T) { func TestHistorical_GlobalSealedSegments(t *testing.T) {
@ -71,7 +71,7 @@ func TestHistorical_GlobalSealedSegments(t *testing.T) {
segmentInfoBytes, err := proto.Marshal(segmentInfo) segmentInfoBytes, err := proto.Marshal(segmentInfo)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, n.etcdKV) assert.NotNil(t, n.etcdKV)
segmentKey := segmentMetaPrefix + "/" + strconv.FormatInt(segmentID, 10) segmentKey := util.SegmentMetaPrefix + "/" + strconv.FormatInt(segmentID, 10)
err = n.etcdKV.Save(segmentKey, string(segmentInfoBytes)) err = n.etcdKV.Save(segmentKey, string(segmentInfoBytes))
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -165,14 +165,7 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
// init global sealed segments // init global sealed segments
for _, segment := range in.GlobalSealedSegments { for _, segment := range in.GlobalSealedSegments {
err = sc.globalSegmentManager.addGlobalSegmentInfo(segment) sc.globalSegmentManager.addGlobalSegmentInfo(segment)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
} }
// start queryCollection, message stream need to asConsumer before start // start queryCollection, message stream need to asConsumer before start

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
) )
@ -1289,7 +1290,7 @@ func saveChangeInfo(key string, value string) error {
return err return err
} }
key = changeInfoMetaPrefix + "/" + key key = util.ChangeInfoMetaPrefix + "/" + key
return kv.Save(key, value) return kv.Save(key, value)
} }

View File

@ -303,11 +303,7 @@ func (q *queryCollection) consumeQuery() {
log.Warn(err.Error()) log.Warn(err.Error())
} }
case *msgstream.SealedSegmentsChangeInfoMsg: case *msgstream.SealedSegmentsChangeInfoMsg:
err := q.adjustByChangeInfo(sm) q.adjustByChangeInfo(sm)
if err != nil {
// should not happen
log.Error(err.Error())
}
default: default:
log.Warn("unsupported msg type in search channel", zap.Any("msg", sm)) log.Warn("unsupported msg type in search channel", zap.Any("msg", sm))
} }
@ -316,15 +312,25 @@ func (q *queryCollection) consumeQuery() {
} }
} }
func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChangeInfoMsg) error { func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChangeInfoMsg) {
for _, info := range msg.Infos { for _, info := range msg.Infos {
// precheck collection id, if not the same collection, skip
for _, segment := range info.OnlineSegments {
if segment.CollectionID != q.collectionID {
return
}
}
for _, segment := range info.OfflineSegments {
if segment.CollectionID != q.collectionID {
return
}
}
// for OnlineSegments: // for OnlineSegments:
for _, segment := range info.OnlineSegments { for _, segment := range info.OnlineSegments {
// 1. update global sealed segments // 1. update global sealed segments
err := q.globalSegmentManager.addGlobalSegmentInfo(segment) q.globalSegmentManager.addGlobalSegmentInfo(segment)
if err != nil {
return err
}
// 2. update excluded segment, cluster have been loaded sealed segments, // 2. update excluded segment, cluster have been loaded sealed segments,
// so we need to avoid getting growing segment from flow graph. // so we need to avoid getting growing segment from flow graph.
q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{ q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{
@ -346,10 +352,14 @@ func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChange
// for OfflineSegments: // for OfflineSegments:
for _, segment := range info.OfflineSegments { for _, segment := range info.OfflineSegments {
// 1. update global sealed segments // 1. update global sealed segments
q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID) q.globalSegmentManager.removeGlobalSealedSegmentInfo(segment.SegmentID)
} }
log.Info("Successfully changed global sealed segment info ",
zap.Int64("collection ", q.collectionID),
zap.Any("online segments ", info.OnlineSegments),
zap.Any("offline segments ", info.OfflineSegments))
} }
return nil
} }
func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {

View File

@ -659,16 +659,14 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) {
// test online // test online
segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, genSimpleSegmentInfo()) segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, genSimpleSegmentInfo())
err = qc.adjustByChangeInfo(segmentChangeInfos) qc.adjustByChangeInfo(segmentChangeInfos)
assert.NoError(t, err)
ids := qc.globalSegmentManager.getGlobalSegmentIDs() ids := qc.globalSegmentManager.getGlobalSegmentIDs()
assert.Len(t, ids, 1) assert.Len(t, ids, 1)
// test offline // test offline
segmentChangeInfos.Infos[0].OnlineSegments = make([]*querypb.SegmentInfo, 0) segmentChangeInfos.Infos[0].OnlineSegments = make([]*querypb.SegmentInfo, 0)
segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo()) segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo())
err = qc.adjustByChangeInfo(segmentChangeInfos) qc.adjustByChangeInfo(segmentChangeInfos)
assert.NoError(t, err)
ids = qc.globalSegmentManager.getGlobalSegmentIDs() ids = qc.globalSegmentManager.getGlobalSegmentIDs()
assert.Len(t, ids, 0) assert.Len(t, ids, 0)
}) })
@ -683,8 +681,7 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) {
simpleInfo := genSimpleSegmentInfo() simpleInfo := genSimpleSegmentInfo()
simpleInfo.CollectionID = 1000 simpleInfo.CollectionID = 1000
segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, simpleInfo) segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, simpleInfo)
err = qc.adjustByChangeInfo(segmentChangeInfos) qc.adjustByChangeInfo(segmentChangeInfos)
assert.NoError(t, err)
}) })
t.Run("test no segment when adjustByChangeInfo", func(t *testing.T) { t.Run("test no segment when adjustByChangeInfo", func(t *testing.T) {
@ -697,8 +694,7 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) {
segmentChangeInfos := genSimpleSealedSegmentsChangeInfoMsg() segmentChangeInfos := genSimpleSealedSegmentsChangeInfoMsg()
segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo()) segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo())
err = qc.adjustByChangeInfo(segmentChangeInfos) qc.adjustByChangeInfo(segmentChangeInfos)
assert.Nil(t, err)
}) })
} }

View File

@ -46,13 +46,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
) )
const changeInfoMetaPrefix = "queryCoord-sealedSegmentChangeInfo"
// make sure QueryNode implements types.QueryNode // make sure QueryNode implements types.QueryNode
var _ types.QueryNode = (*QueryNode)(nil) var _ types.QueryNode = (*QueryNode)(nil)
@ -327,8 +326,7 @@ func (node *QueryNode) SetIndexCoord(index types.IndexCoord) error {
func (node *QueryNode) watchChangeInfo() { func (node *QueryNode) watchChangeInfo() {
log.Debug("query node watchChangeInfo start") log.Debug("query node watchChangeInfo start")
watchChan := node.etcdKV.WatchWithPrefix(changeInfoMetaPrefix) watchChan := node.etcdKV.WatchWithPrefix(util.ChangeInfoMetaPrefix)
for { for {
select { select {
case <-node.queryNodeLoopCtx.Done(): case <-node.queryNodeLoopCtx.Done():
@ -353,9 +351,9 @@ func (node *QueryNode) watchChangeInfo() {
continue continue
} }
go func() { go func() {
err = node.adjustByChangeInfo(info) err = node.removeSegments(info)
if err != nil { if err != nil {
log.Warn("adjustByChangeInfo failed", zap.Any("error", err.Error())) log.Warn("cleanup segments failed", zap.Any("error", err.Error()))
} }
}() }()
default: default:
@ -370,6 +368,7 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments
fn := func() error { fn := func() error {
for _, info := range segmentChangeInfos.Infos { for _, info := range segmentChangeInfos.Infos {
canDoLoadBalance := true canDoLoadBalance := true
// make sure all query channel already received segment location changes
// Check online segments: // Check online segments:
for _, segmentInfo := range info.OnlineSegments { for _, segmentInfo := range info.OnlineSegments {
if node.queryService.hasQueryCollection(segmentInfo.CollectionID) { if node.queryService.hasQueryCollection(segmentInfo.CollectionID) {
@ -378,7 +377,7 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments
canDoLoadBalance = false canDoLoadBalance = false
break break
} }
if info.OnlineNodeID == Params.QueryNodeID && !qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { if info.OnlineNodeID == Params.QueryNodeID && !qc.globalSegmentManager.hasGlobalSealedSegment(segmentInfo.SegmentID) {
canDoLoadBalance = false canDoLoadBalance = false
break break
} }
@ -392,7 +391,7 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments
canDoLoadBalance = false canDoLoadBalance = false
break break
} }
if info.OfflineNodeID == Params.QueryNodeID && qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { if info.OfflineNodeID == Params.QueryNodeID && qc.globalSegmentManager.hasGlobalSealedSegment(segmentInfo.SegmentID) {
canDoLoadBalance = false canDoLoadBalance = false
break break
} }
@ -407,13 +406,13 @@ func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegments
return nil return nil
} }
return retry.Do(context.TODO(), fn, retry.Attempts(10)) return retry.Do(context.TODO(), fn, retry.Attempts(50))
} }
func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegmentsChangeInfo) error { // remove the segments since it's already compacted or balanced to other querynodes
func (node *QueryNode) removeSegments(segmentChangeInfos *querypb.SealedSegmentsChangeInfo) error {
err := node.waitChangeInfo(segmentChangeInfos) err := node.waitChangeInfo(segmentChangeInfos)
if err != nil { if err != nil {
log.Error("waitChangeInfo failed", zap.Any("error", err.Error()))
return err return err
} }
@ -429,10 +428,9 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm
if hasGrowingSegment { if hasGrowingSegment {
err := node.streaming.replica.removeSegment(segmentInfo.SegmentID) err := node.streaming.replica.removeSegment(segmentInfo.SegmentID)
if err != nil { if err != nil {
return err return err
} }
log.Debug("remove growing segment in adjustByChangeInfo", log.Debug("remove growing segment in removeSegments",
zap.Any("collectionID", segmentInfo.CollectionID), zap.Any("collectionID", segmentInfo.CollectionID),
zap.Any("segmentID", segmentInfo.SegmentID), zap.Any("segmentID", segmentInfo.SegmentID),
zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()), zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()),
@ -441,13 +439,17 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm
} }
// For offline segments: // For offline segments:
for _, segment := range info.OfflineSegments { for _, segmentInfo := range info.OfflineSegments {
// load balance or compaction, remove old sealed segments. // load balance or compaction, remove old sealed segments.
if info.OfflineNodeID == Params.QueryNodeID { if info.OfflineNodeID == Params.QueryNodeID {
err := node.historical.replica.removeSegment(segment.SegmentID) err := node.historical.replica.removeSegment(segmentInfo.SegmentID)
if err != nil { if err != nil {
return err return err
} }
log.Debug("remove sealed segment", zap.Any("collectionID", segmentInfo.CollectionID),
zap.Any("segmentID", segmentInfo.SegmentID),
zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()),
)
} }
} }
} }

View File

@ -295,11 +295,7 @@ func genSimpleQueryNodeToTestWatchChangeInfo(ctx context.Context) (*QueryNode, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = qc.globalSegmentManager.addGlobalSegmentInfo(genSimpleSegmentInfo()) qc.globalSegmentManager.addGlobalSegmentInfo(genSimpleSegmentInfo())
if err != nil {
return nil, err
}
return node, nil return node, nil
} }
@ -318,15 +314,15 @@ func TestQueryNode_adjustByChangeInfo(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
t.Run("test adjustByChangeInfo", func(t *testing.T) { t.Run("test cleanup segments", func(t *testing.T) {
node, err := genSimpleQueryNodeToTestWatchChangeInfo(ctx) node, err := genSimpleQueryNodeToTestWatchChangeInfo(ctx)
assert.NoError(t, err) assert.NoError(t, err)
err = node.adjustByChangeInfo(genSimpleChangeInfo()) err = node.removeSegments(genSimpleChangeInfo())
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("test adjustByChangeInfo no segment", func(t *testing.T) { t.Run("test cleanup segments no segment", func(t *testing.T) {
node, err := genSimpleQueryNodeToTestWatchChangeInfo(ctx) node, err := genSimpleQueryNodeToTestWatchChangeInfo(ctx)
assert.NoError(t, err) assert.NoError(t, err)
@ -339,9 +335,9 @@ func TestQueryNode_adjustByChangeInfo(t *testing.T) {
qc, err := node.queryService.getQueryCollection(defaultCollectionID) qc, err := node.queryService.getQueryCollection(defaultCollectionID)
assert.NoError(t, err) assert.NoError(t, err)
qc.globalSegmentManager.removeGlobalSegmentInfo(defaultSegmentID) qc.globalSegmentManager.removeGlobalSealedSegmentInfo(defaultSegmentID)
err = node.adjustByChangeInfo(segmentChangeInfos) err = node.removeSegments(segmentChangeInfos)
assert.Error(t, err) assert.Error(t, err)
}) })
} }
@ -402,7 +398,7 @@ func TestQueryNode_watchChangeInfo(t *testing.T) {
qc, err := node.queryService.getQueryCollection(defaultCollectionID) qc, err := node.queryService.getQueryCollection(defaultCollectionID)
assert.NoError(t, err) assert.NoError(t, err)
qc.globalSegmentManager.removeGlobalSegmentInfo(defaultSegmentID) qc.globalSegmentManager.removeGlobalSealedSegmentInfo(defaultSegmentID)
go node.watchChangeInfo() go node.watchChangeInfo()

View File

@ -35,10 +35,6 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/funcutil"
) )
const (
queryNodeSegmentMetaPrefix = "queryNode-segmentMeta"
)
// segmentLoader is only responsible for loading the field data from binlog // segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct { type segmentLoader struct {
historicalReplica ReplicaInterface historicalReplica ReplicaInterface

View File

@ -0,0 +1,6 @@
package util
const (
SegmentMetaPrefix = "queryCoord-segmentMeta"
ChangeInfoMetaPrefix = "queryCoord-sealedSegmentChangeInfo"
)