fix: [10kcp] Fix data view and add more ut (#37915)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/37981/head
yihao.dai 2024-11-21 21:35:42 +08:00 committed by GitHub
parent 4845e4d679
commit fd30034c77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 339 additions and 12 deletions

View File

@ -18,7 +18,7 @@ package dataview
import "github.com/milvus-io/milvus/internal/proto/datapb"
const InitialDataViewVersion = 0
const InitialDataViewVersion int64 = 0
type DataView struct {
CollectionID int64

View File

@ -33,6 +33,7 @@ type PullNewDataViewFunction func(collectionID int64) (*DataView, error)
type ViewManager interface {
Get(collectionID int64) (*DataView, error)
GetVersion(collectionID int64) int64
Remove(collectionID int64)
Start()
Close()
@ -63,8 +64,12 @@ func (m *dataViewManager) Get(collectionID int64) (*DataView, error) {
if err != nil {
return nil, err
}
m.currentViews.GetOrInsert(collectionID, view)
return view, nil
v, ok := m.currentViews.GetOrInsert(collectionID, view)
if !ok {
log.Info("update new data view", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version))
}
return v, nil
}
func (m *dataViewManager) GetVersion(collectionID int64) int64 {
@ -74,6 +79,12 @@ func (m *dataViewManager) GetVersion(collectionID int64) int64 {
return InitialDataViewVersion
}
func (m *dataViewManager) Remove(collectionID int64) {
if view, ok := m.currentViews.GetAndRemove(collectionID); ok {
log.Info("data view removed", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version))
}
}
func (m *dataViewManager) Start() {
ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second))
defer ticker.Stop()
@ -100,35 +111,36 @@ func (m *dataViewManager) Close() {
}
func (m *dataViewManager) update(view *DataView) {
_, ok := m.currentViews.GetOrInsert(view.CollectionID, view)
if ok {
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version))
}
m.currentViews.Insert(view.CollectionID, view)
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version))
}
func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
newView, err := m.pullFn(collectionID)
if err != nil {
log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err))
// notify to trigger pull again
// notify to trigger retry
NotifyUpdate(collectionID)
return
}
currentView, ok := m.currentViews.Get(collectionID)
if !ok {
m.currentViews.GetOrInsert(collectionID, newView)
// update due to data view is empty
m.update(newView)
return
}
// no-op if the incoming version is less than the current version.
if newView.Version <= currentView.Version {
log.Warn("stale version, skip update", zap.Int64("collectionID", collectionID),
zap.Int64("new", newView.Version), zap.Int64("current", currentView.Version))
return
}
// check if channel info has been updated.
for channel, new := range newView.Channels {
current, ok := currentView.Channels[channel]
if !ok {
// update due to channel info is empty
m.update(newView)
return
}
@ -137,22 +149,25 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
!funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) {
// update due to segments list changed
m.update(newView)
return
}
if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) {
// update due to partition stats changed
m.update(newView)
return
}
// TODO: It might be too frequent.
if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() {
// update due to channel cp advanced
m.update(newView)
return
}
}
// check if segment info has been updated.
if !typeutil.MapEqual(newView.Segments, currentView.Segments) {
m.currentViews.GetOrInsert(collectionID, newView)
// update due to segments list changed
m.update(newView)
}
}

View File

@ -0,0 +1,217 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dataview
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func init() {
paramtable.Init()
}
func TestNewDataViewManager_Get(t *testing.T) {
pullFn := func(collectionID int64) (*DataView, error) {
return &DataView{
CollectionID: collectionID,
Channels: nil,
Segments: nil,
Version: time.Now().UnixNano(),
}, nil
}
manager := NewDataViewManager(pullFn)
collectionID := int64(1)
// No data view
version := manager.GetVersion(collectionID)
assert.Equal(t, InitialDataViewVersion, version)
// Lazy get data view
v1, err := manager.Get(collectionID)
assert.NoError(t, err)
assert.NotEqual(t, InitialDataViewVersion, v1)
version = manager.GetVersion(v1.CollectionID)
assert.Equal(t, v1.Version, version)
// Get again, data view should not update
v2, err := manager.Get(collectionID)
assert.NoError(t, err)
assert.Equal(t, v1, v2)
}
func TestNewDataViewManager_TryUpdateDataView(t *testing.T) {
manager := NewDataViewManager(nil)
go manager.Start()
defer manager.Close()
collectionID := int64(1)
// Update due to data view is empty
v1 := &DataView{
CollectionID: collectionID,
Version: time.Now().UnixNano(),
}
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return v1, nil
}
NotifyUpdate(collectionID)
assert.Eventually(t, func() bool {
version := manager.GetVersion(collectionID)
return version == v1.Version
}, 1*time.Second, 10*time.Millisecond)
// Update due to channel info is empty
v2 := &DataView{
CollectionID: collectionID,
Channels: map[string]*datapb.VchannelInfo{"ch0": {
CollectionID: collectionID,
ChannelName: "ch0",
}},
Version: time.Now().UnixNano(),
}
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return v2, nil
}
NotifyUpdate(collectionID)
assert.Eventually(t, func() bool {
version := manager.GetVersion(collectionID)
return version == v2.Version
}, 1*time.Second, 10*time.Millisecond)
// Update due to segments list changed
v3 := &DataView{
CollectionID: collectionID,
Channels: map[string]*datapb.VchannelInfo{"ch0": {
CollectionID: collectionID,
ChannelName: "ch0",
UnflushedSegmentIds: []int64{100, 200},
}},
Version: time.Now().UnixNano(),
}
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return v3, nil
}
NotifyUpdate(collectionID)
assert.Eventually(t, func() bool {
version := manager.GetVersion(collectionID)
return version == v3.Version
}, 1*time.Second, 10*time.Millisecond)
// Update due to partition stats changed
v4 := &DataView{
CollectionID: collectionID,
Channels: map[string]*datapb.VchannelInfo{"ch0": {
CollectionID: collectionID,
ChannelName: "ch0",
SeekPosition: &msgpb.MsgPosition{
Timestamp: uint64(time.Now().UnixNano()),
},
UnflushedSegmentIds: []int64{100, 200},
PartitionStatsVersions: map[int64]int64{1000: 2000},
}},
Version: time.Now().UnixNano(),
}
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return v4, nil
}
NotifyUpdate(collectionID)
assert.Eventually(t, func() bool {
version := manager.GetVersion(collectionID)
return version == v4.Version
}, 1*time.Second, 10*time.Millisecond)
// Update due to channel cp advanced
v5 := &DataView{
CollectionID: collectionID,
Channels: map[string]*datapb.VchannelInfo{"ch0": {
CollectionID: collectionID,
ChannelName: "ch0",
SeekPosition: &msgpb.MsgPosition{
Timestamp: uint64(time.Now().UnixNano()),
},
UnflushedSegmentIds: []int64{100, 200},
PartitionStatsVersions: map[int64]int64{1000: 2000},
}},
Version: time.Now().UnixNano(),
}
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return v5, nil
}
NotifyUpdate(collectionID)
assert.Eventually(t, func() bool {
version := manager.GetVersion(collectionID)
return version == v5.Version
}, 1*time.Second, 10*time.Millisecond)
// Update due to segments list changed
v6 := &DataView{
CollectionID: collectionID,
Channels: map[string]*datapb.VchannelInfo{"ch0": {
CollectionID: collectionID,
ChannelName: "ch0",
SeekPosition: &msgpb.MsgPosition{
Timestamp: v5.Channels["ch0"].GetSeekPosition().GetTimestamp(),
},
UnflushedSegmentIds: []int64{100, 200},
PartitionStatsVersions: map[int64]int64{1000: 2000},
}},
Segments: map[int64]struct{}{
300: {},
},
Version: time.Now().UnixNano(),
}
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return v6, nil
}
NotifyUpdate(collectionID)
assert.Eventually(t, func() bool {
version := manager.GetVersion(collectionID)
return version == v6.Version
}, 1*time.Second, 10*time.Millisecond)
// Won't update anymore
NotifyUpdate(collectionID)
assert.Never(t, func() bool {
version := manager.GetVersion(collectionID)
return version != v6.Version
}, 100*time.Millisecond, 10*time.Millisecond)
}
func TestNewDataViewManager_TryUpdateDataView_Failed(t *testing.T) {
manager := NewDataViewManager(nil)
go manager.Start()
defer manager.Close()
collectionID := int64(1)
manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) {
return nil, fmt.Errorf("mock err")
}
NotifyUpdate(collectionID)
assert.Never(t, func() bool {
version := manager.GetVersion(collectionID)
return version > InitialDataViewVersion
}, 100*time.Millisecond, 10*time.Millisecond)
}

View File

@ -474,5 +474,8 @@ func (h *ServerHandler) FinishDropChannel(channel string, collectionID int64) er
// clean collection info cache when meet drop collection info
h.s.meta.DropCollection(collectionID)
// clean data view
h.s.viewManager.Remove(collectionID)
return nil
}

View File

@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/dataview"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
mocks2 "github.com/milvus-io/milvus/internal/mocks"
@ -1060,6 +1061,39 @@ func TestServer_GcConfirm(t *testing.T) {
})
}
func TestGetDataViewVersions(t *testing.T) {
t.Run("server not healthy", func(t *testing.T) {
svr := newTestServer(t)
closeTestServer(t, svr)
resp, err := svr.GetDataViewVersions(context.TODO(), &datapb.GetDataViewVersionsRequest{})
assert.NoError(t, err)
err = merr.Error(resp.GetStatus())
assert.ErrorIs(t, err, merr.ErrServiceNotReady)
})
t.Run("normal", func(t *testing.T) {
svr := newTestServer(t)
defer closeTestServer(t, svr)
pullFn := func(collectionID int64) (*dataview.DataView, error) {
return &dataview.DataView{
CollectionID: collectionID,
Version: time.Now().UnixNano(),
}, nil
}
manager := dataview.NewDataViewManager(pullFn)
svr.viewManager = manager
req := &datapb.GetDataViewVersionsRequest{
CollectionIDs: []int64{100, 200, 300},
}
resp, err := svr.GetDataViewVersions(context.TODO(), req)
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetCode())
assert.EqualValues(t, 3, len(resp.GetDataViewVersions()))
})
}
func TestGetRecoveryInfoV2(t *testing.T) {
t.Run("test get recovery info with no segments", func(t *testing.T) {
svr := newTestServer(t)

View File

@ -692,6 +692,57 @@ func Test_SaveBinlogPaths(t *testing.T) {
assert.ErrorIs(t, err, context.DeadlineExceeded)
}
func Test_GetDataViewVersions(t *testing.T) {
paramtable.Init()
ctx := context.Background()
client, err := NewClient(ctx)
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
mockDC := mocks.NewMockDataCoordClient(t)
mockGrpcClient := mocks.NewMockGrpcClient[datapb.DataCoordClient](t)
mockGrpcClient.EXPECT().Close().Return(nil)
mockGrpcClient.EXPECT().ReCall(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f func(datapb.DataCoordClient) (interface{}, error)) (interface{}, error) {
return f(mockDC)
})
client.(*Client).grpcClient = mockGrpcClient
// test success
mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{
Status: merr.Success(),
}, nil)
_, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{})
assert.Nil(t, err)
// test return error status
mockDC.ExpectedCalls = nil
mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{
Status: merr.Status(merr.ErrServiceNotReady),
}, nil)
rsp, err := client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{})
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
assert.Nil(t, err)
// test return error
mockDC.ExpectedCalls = nil
mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{
Status: merr.Success(),
}, mockErr)
_, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{})
assert.NotNil(t, err)
// test ctx done
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
time.Sleep(20 * time.Millisecond)
_, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{})
assert.ErrorIs(t, err, context.DeadlineExceeded)
}
func Test_GetRecoveryInfo(t *testing.T) {
paramtable.Init()

View File

@ -130,6 +130,13 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, resp)
})
t.Run("GetDataViewVersions", func(t *testing.T) {
mockDataCoord.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{}, nil)
resp, err := server.GetDataViewVersions(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, resp)
})
t.Run("GetRecoveryInfo", func(t *testing.T) {
mockDataCoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponse{}, nil)
resp, err := server.GetRecoveryInfo(ctx, nil)