mirror of https://github.com/milvus-io/milvus.git
Delete useless watchGlobalSegmentMeta (#13914)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/14016/head
parent
dfe06841fb
commit
ef96b40dd4
|
@ -20,20 +20,15 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
)
|
||||
|
||||
// historical is in charge of historical data in query node
|
||||
|
@ -65,7 +60,6 @@ func newHistorical(ctx context.Context,
|
|||
}
|
||||
|
||||
func (h *historical) start() {
|
||||
go h.watchGlobalSegmentMeta()
|
||||
}
|
||||
|
||||
// close would release all resources in historical
|
||||
|
@ -74,57 +68,6 @@ func (h *historical) close() {
|
|||
h.replica.freeAll()
|
||||
}
|
||||
|
||||
func (h *historical) watchGlobalSegmentMeta() {
|
||||
log.Debug("query node watchGlobalSegmentMeta start")
|
||||
watchChan := h.etcdKV.WatchWithPrefix(util.SegmentMetaPrefix)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-h.ctx.Done():
|
||||
log.Debug("query node watchGlobalSegmentMeta close")
|
||||
return
|
||||
case resp := <-watchChan:
|
||||
for _, event := range resp.Events {
|
||||
segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
|
||||
continue
|
||||
}
|
||||
switch event.Type {
|
||||
case mvccpb.PUT:
|
||||
log.Debug("globalSealedSegments add segment",
|
||||
zap.Any("segmentID", segmentID),
|
||||
)
|
||||
segmentInfo := &querypb.SegmentInfo{}
|
||||
err = proto.Unmarshal(event.Kv.Value, segmentInfo)
|
||||
if err != nil {
|
||||
log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
|
||||
continue
|
||||
}
|
||||
h.addGlobalSegmentInfo(segmentID, segmentInfo)
|
||||
case mvccpb.DELETE:
|
||||
log.Debug("globalSealedSegments delete segment",
|
||||
zap.Any("segmentID", segmentID),
|
||||
)
|
||||
h.removeGlobalSegmentInfo(segmentID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *historical) addGlobalSegmentInfo(segmentID UniqueID, segmentInfo *querypb.SegmentInfo) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.globalSealedSegments[segmentID] = segmentInfo
|
||||
}
|
||||
|
||||
func (h *historical) removeGlobalSegmentInfo(segmentID UniqueID) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
delete(h.globalSealedSegments, segmentID)
|
||||
}
|
||||
|
||||
func (h *historical) getGlobalSegmentIDsByCollectionID(collectionID UniqueID) []UniqueID {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
|
|
@ -18,83 +18,11 @@ package querynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestHistorical_GlobalSealedSegments(t *testing.T) {
|
||||
n := newQueryNodeMock()
|
||||
|
||||
// init meta
|
||||
segmentID := UniqueID(0)
|
||||
partitionID := UniqueID(1)
|
||||
collectionID := UniqueID(2)
|
||||
segmentInfo := &querypb.SegmentInfo{
|
||||
SegmentID: segmentID,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
}
|
||||
|
||||
emptySegmentCheck := func() {
|
||||
segmentIDs := n.historical.getGlobalSegmentIDsByCollectionID(collectionID)
|
||||
assert.Equal(t, 0, len(segmentIDs))
|
||||
segmentIDs = n.historical.getGlobalSegmentIDsByPartitionIds([]UniqueID{partitionID})
|
||||
assert.Equal(t, 0, len(segmentIDs))
|
||||
}
|
||||
|
||||
// static test
|
||||
emptySegmentCheck()
|
||||
n.historical.addGlobalSegmentInfo(segmentID, segmentInfo)
|
||||
segmentIDs := n.historical.getGlobalSegmentIDsByCollectionID(collectionID)
|
||||
assert.Equal(t, 1, len(segmentIDs))
|
||||
assert.Equal(t, segmentIDs[0], segmentID)
|
||||
|
||||
segmentIDs = n.historical.getGlobalSegmentIDsByPartitionIds([]UniqueID{partitionID})
|
||||
assert.Equal(t, 1, len(segmentIDs))
|
||||
assert.Equal(t, segmentIDs[0], segmentID)
|
||||
|
||||
n.historical.removeGlobalSegmentInfo(segmentID)
|
||||
emptySegmentCheck()
|
||||
|
||||
n.historical.addGlobalSegmentInfo(segmentID, segmentInfo)
|
||||
n.historical.removeGlobalSegmentIDsByCollectionID(collectionID)
|
||||
emptySegmentCheck()
|
||||
|
||||
n.historical.addGlobalSegmentInfo(segmentID, segmentInfo)
|
||||
n.historical.removeGlobalSegmentIDsByPartitionIds([]UniqueID{partitionID})
|
||||
emptySegmentCheck()
|
||||
|
||||
// watch test
|
||||
go n.historical.watchGlobalSegmentMeta()
|
||||
time.Sleep(100 * time.Millisecond) // for etcd latency
|
||||
segmentInfoBytes, err := proto.Marshal(segmentInfo)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, n.etcdKV)
|
||||
segmentKey := util.SegmentMetaPrefix + "/" + strconv.FormatInt(segmentID, 10)
|
||||
err = n.etcdKV.Save(segmentKey, string(segmentInfoBytes))
|
||||
assert.NoError(t, err)
|
||||
|
||||
time.Sleep(200 * time.Millisecond) // for etcd latency
|
||||
segmentIDs = n.historical.getGlobalSegmentIDsByCollectionID(collectionID)
|
||||
assert.Equal(t, 1, len(segmentIDs))
|
||||
assert.Equal(t, segmentIDs[0], segmentID)
|
||||
|
||||
segmentIDs = n.historical.getGlobalSegmentIDsByPartitionIds([]UniqueID{partitionID})
|
||||
assert.Equal(t, 1, len(segmentIDs))
|
||||
assert.Equal(t, segmentIDs[0], segmentID)
|
||||
|
||||
err = n.etcdKV.Remove(segmentKey)
|
||||
assert.NoError(t, err)
|
||||
time.Sleep(100 * time.Millisecond) // for etcd latency
|
||||
emptySegmentCheck()
|
||||
}
|
||||
|
||||
func TestHistorical_Search(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
Loading…
Reference in New Issue