enhance: [10kcp] Reduce GetRecoveryInfo calls (#37891)

1. Introduce a data view mechanism for DataCoord, attempting to update
each collection's data view periodically.
2. QueryCoord maintains a cache of data view versions. Before
batch-fetching recovery info, it retrieves all versions and only fetches
recovery info for collections with updated versions.
3. Return DataCoord's current data view when fetching RecoverInfo.

issue: https://github.com/milvus-io/milvus/issues/37743,
https://github.com/milvus-io/milvus/issues/37630

pr: https://github.com/milvus-io/milvus/pull/37863

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/37914/head
yihao.dai 2024-11-21 15:43:13 +08:00 committed by GitHub
parent ce8069c0fd
commit bf90e55319
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 656 additions and 45 deletions

View File

@ -0,0 +1,28 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dataview
import "github.com/milvus-io/milvus/internal/proto/datapb"
const InitialDataViewVersion = 0
type DataView struct {
CollectionID int64
Channels map[string]*datapb.VchannelInfo
Segments map[int64]struct{}
Version int64
}

View File

@ -0,0 +1,33 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dataview
import "sync"
var updateChan chan int64
var initOnce sync.Once
func initUpdateChan() {
initOnce.Do(func() {
updateChan = make(chan int64, 1024)
})
}
// NotifyUpdate used to trigger updating data view immediately.
func NotifyUpdate(collectionID int64) {
updateChan <- collectionID
}

View File

@ -0,0 +1,158 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dataview
import (
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type PullNewDataViewFunction func(collectionID int64) (*DataView, error)
type ViewManager interface {
Get(collectionID int64) (*DataView, error)
GetVersion(collectionID int64) int64
Start()
Close()
}
type dataViewManager struct {
pullFn PullNewDataViewFunction
currentViews *typeutil.ConcurrentMap[int64, *DataView]
closeOnce sync.Once
closeChan chan struct{}
}
func NewDataViewManager(pullFn PullNewDataViewFunction) ViewManager {
initUpdateChan()
return &dataViewManager{
pullFn: pullFn,
currentViews: typeutil.NewConcurrentMap[int64, *DataView](),
closeChan: make(chan struct{}),
}
}
func (m *dataViewManager) Get(collectionID int64) (*DataView, error) {
if view, ok := m.currentViews.Get(collectionID); ok {
return view, nil
}
view, err := m.pullFn(collectionID)
if err != nil {
return nil, err
}
m.currentViews.GetOrInsert(collectionID, view)
return view, nil
}
func (m *dataViewManager) GetVersion(collectionID int64) int64 {
if view, ok := m.currentViews.Get(collectionID); ok {
return view.Version
}
return InitialDataViewVersion
}
func (m *dataViewManager) Start() {
ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-m.closeChan:
log.Info("data view manager exited")
return
case <-ticker.C:
// periodically update all data view
for _, collectionID := range m.currentViews.Keys() {
m.TryUpdateDataView(collectionID)
}
case collectionID := <-updateChan:
m.TryUpdateDataView(collectionID)
}
}
}
func (m *dataViewManager) Close() {
m.closeOnce.Do(func() {
close(m.closeChan)
})
}
func (m *dataViewManager) update(view *DataView) {
_, ok := m.currentViews.GetOrInsert(view.CollectionID, view)
if ok {
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version))
}
}
func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
newView, err := m.pullFn(collectionID)
if err != nil {
log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err))
// notify to trigger pull again
NotifyUpdate(collectionID)
return
}
currentView, ok := m.currentViews.Get(collectionID)
if !ok {
m.currentViews.GetOrInsert(collectionID, newView)
return
}
// no-op if the incoming version is less than the current version.
if newView.Version <= currentView.Version {
return
}
// check if channel info has been updated.
for channel, new := range newView.Channels {
current, ok := currentView.Channels[channel]
if !ok {
m.update(newView)
return
}
if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetUnflushedSegmentIds(), current.GetUnflushedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) {
m.update(newView)
return
}
if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) {
m.update(newView)
return
}
// TODO: It might be too frequent.
if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() {
m.update(newView)
return
}
}
// check if segment info has been updated.
if !typeutil.MapEqual(newView.Segments, currentView.Segments) {
m.currentViews.GetOrInsert(collectionID, newView)
}
}

View File

@ -37,6 +37,7 @@ import (
globalIDAllocator "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/dataview"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -126,6 +127,7 @@ type Server struct {
importMeta ImportMeta
importScheduler ImportScheduler
importChecker ImportChecker
viewManager dataview.ViewManager
compactionTrigger trigger
compactionHandler compactionPlanContext
@ -403,6 +405,8 @@ func (s *Server) initDataCoord() error {
s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh)
s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.importMeta)
s.viewManager = dataview.NewDataViewManager(s.pullNewDataView)
s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager)
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
@ -723,6 +727,7 @@ func (s *Server) startServerLoop() {
s.startIndexService(s.serverLoopCtx)
go s.importScheduler.Start()
go s.importChecker.Start()
go s.viewManager.Start()
s.garbageCollector.start()
s.syncSegmentsScheduler.Start()
}
@ -1115,6 +1120,7 @@ func (s *Server) Stop() error {
s.importScheduler.Close()
s.importChecker.Close()
s.viewManager.Close()
s.syncSegmentsScheduler.Stop()
s.stopCompaction()

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datacoord/dataview"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -806,13 +807,35 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
return resp, nil
}
// GetDataViewVersions retrieves the data view versions of the target collections.
func (s *Server) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) {
log := log.Ctx(ctx).With(zap.Int("numCollections", len(req.GetCollectionIDs())))
log.Info("GetDataViewVersions request received")
resp := &datapb.GetDataViewVersionsResponse{
Status: merr.Success(),
}
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.GetDataViewVersionsResponse{
Status: merr.Status(err),
}, nil
}
versions := make(map[int64]int64, len(req.GetCollectionIDs()))
for _, id := range req.GetCollectionIDs() {
versions[id] = s.viewManager.GetVersion(id)
}
resp.DataViewVersions = versions
log.Info("GetDataViewVersions done")
return resp, nil
}
// GetRecoveryInfoV2 get recovery info for segment
// Called by: QueryCoord.
func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryInfoRequestV2) (*datapb.GetRecoveryInfoResponseV2, error) {
log := log.Ctx(ctx)
collectionID := req.GetCollectionID()
partitionIDs := req.GetPartitionIDs()
log = log.With(
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
)
@ -825,13 +848,21 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
Status: merr.Status(err),
}, nil
}
channels := s.channelManager.GetChannelsByCollectionID(collectionID)
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
flushedIDs := make(typeutil.UniqueSet)
for _, ch := range channels {
channelInfo := s.handler.GetQueryVChanPositions(ch, partitionIDs...)
dataView, err := s.viewManager.Get(req.GetCollectionID())
if err != nil {
log.Warn("get data view failed in GetRecoveryInfoV2", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
channelInfos := make([]*datapb.VchannelInfo, 0, len(dataView.Channels))
for _, info := range dataView.Channels {
channelInfo := typeutil.Clone(info)
// retrieve target partition stats versions
channelInfo.PartitionStatsVersions = lo.PickByKeys(channelInfo.PartitionStatsVersions, req.GetPartitionIDs())
channelInfos = append(channelInfos, channelInfo)
log.Info("datacoord append channelInfo in GetRecoveryInfo",
log.Info("datacoord append channelInfo in GetRecoveryInfoV2",
zap.String("channel", channelInfo.GetChannelName()),
zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())),
zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())),
@ -839,43 +870,17 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())),
zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())),
)
flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...)
}
segmentInfos := make([]*datapb.SegmentInfo, 0)
for id := range flushedIDs {
for id := range dataView.Segments {
segment := s.meta.GetSegment(id)
if segment == nil {
err := merr.WrapErrSegmentNotFound(id)
err = merr.WrapErrSegmentNotFound(id)
log.Warn("failed to get segment", zap.Int64("segmentID", id))
resp.Status = merr.Status(err)
return resp, nil
}
// Skip non-flushing, non-flushed and dropped segments.
if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped {
continue
}
// Also skip bulk insert segments.
if segment.GetIsImporting() {
continue
}
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
InsertChannel: segment.InsertChannel,
NumOfRows: segment.NumOfRows,
Level: segment.GetLevel(),
})
continue
}
binlogs := segment.GetBinlogs()
if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 {
continue
}
rowCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo)
if rowCount != segment.NumOfRows && rowCount > 0 {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
@ -885,7 +890,6 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
} else {
rowCount = segment.NumOfRows
}
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segment.ID,
PartitionID: segment.PartitionID,
@ -901,6 +905,68 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
return resp, nil
}
func (s *Server) pullNewDataView(collectionID int64) (*dataview.DataView, error) {
version := time.Now().UnixNano()
log := log.With(
zap.Int64("collectionID", collectionID),
zap.Int64("version", version),
)
channels := s.channelManager.GetChannelsByCollectionID(collectionID)
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
flushedIDs := make(typeutil.UniqueSet)
for _, ch := range channels {
channelInfo := s.handler.GetQueryVChanPositions(ch, allPartitionID)
channelInfos = append(channelInfos, channelInfo)
log.Info("datacoord append channelInfo in pullNewDataView",
zap.String("channel", channelInfo.GetChannelName()),
zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())),
zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())),
zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())),
zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())),
zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())),
)
flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...)
}
segments := make([]int64, 0)
for id := range flushedIDs {
segment := s.meta.GetSegment(id)
if segment == nil {
err := merr.WrapErrSegmentNotFound(id)
log.Warn("failed to get segment", zap.Int64("segmentID", id))
return nil, err
}
// Skip non-flushing, non-flushed and dropped segments.
if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped {
continue
}
// Also skip bulk insert segments.
if segment.GetIsImporting() {
continue
}
binlogs := segment.GetBinlogs()
if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 {
continue
}
segments = append(segments, id)
}
newDV := &dataview.DataView{
CollectionID: collectionID,
Channels: lo.KeyBy(channelInfos, func(v *datapb.VchannelInfo) string {
return v.GetChannelName()
}),
Segments: lo.SliceToMap(segments, func(id int64) (int64, struct{}) {
return id, struct{}{}
}),
Version: version,
}
return newDV, nil
}
// GetFlushedSegments returns all segment matches provided criterion and in state Flushed or Dropped (compacted but not GCed yet)
// If requested partition id < 0, ignores the partition id filter
func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {

View File

@ -305,6 +305,18 @@ func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
})
}
// GetDataViewVersions retrieves the data view versions of the target collections.
func (c *Client) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetDataViewVersionsResponse, error) {
return client.GetDataViewVersions(ctx, req)
})
}
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation

View File

@ -335,6 +335,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return s.dataCoord.SaveBinlogPaths(ctx, req)
}
// GetDataViewVersions retrieves the data view versions of the target collections.
func (s *Server) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) {
return s.dataCoord.GetDataViewVersions(ctx, req)
}
// GetRecoveryInfo gets information for recovering channels
func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
return s.dataCoord.GetRecoveryInfo(ctx, req)

View File

@ -861,6 +861,65 @@ func (_c *MockDataCoord_GetComponentStates_Call) RunAndReturn(run func(context.C
return _c
}
// GetDataViewVersions provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) GetDataViewVersions(_a0 context.Context, _a1 *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for GetDataViewVersions")
}
var r0 *datapb.GetDataViewVersionsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest) *datapb.GetDataViewVersionsResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.GetDataViewVersionsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *datapb.GetDataViewVersionsRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoord_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions'
type MockDataCoord_GetDataViewVersions_Call struct {
*mock.Call
}
// GetDataViewVersions is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.GetDataViewVersionsRequest
func (_e *MockDataCoord_Expecter) GetDataViewVersions(_a0 interface{}, _a1 interface{}) *MockDataCoord_GetDataViewVersions_Call {
return &MockDataCoord_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", _a0, _a1)}
}
func (_c *MockDataCoord_GetDataViewVersions_Call) Run(run func(_a0 context.Context, _a1 *datapb.GetDataViewVersionsRequest)) *MockDataCoord_GetDataViewVersions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.GetDataViewVersionsRequest))
})
return _c
}
func (_c *MockDataCoord_GetDataViewVersions_Call) Return(_a0 *datapb.GetDataViewVersionsResponse, _a1 error) *MockDataCoord_GetDataViewVersions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoord_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error)) *MockDataCoord_GetDataViewVersions_Call {
_c.Call.Return(run)
return _c
}
// GetFlushAllState provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) GetFlushAllState(_a0 context.Context, _a1 *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -1124,6 +1124,80 @@ func (_c *MockDataCoordClient_GetComponentStates_Call) RunAndReturn(run func(con
return _c
}
// GetDataViewVersions provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) GetDataViewVersions(ctx context.Context, in *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetDataViewVersions")
}
var r0 *datapb.GetDataViewVersionsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) *datapb.GetDataViewVersionsResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.GetDataViewVersionsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoordClient_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions'
type MockDataCoordClient_GetDataViewVersions_Call struct {
*mock.Call
}
// GetDataViewVersions is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.GetDataViewVersionsRequest
// - opts ...grpc.CallOption
func (_e *MockDataCoordClient_Expecter) GetDataViewVersions(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_GetDataViewVersions_Call {
return &MockDataCoordClient_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockDataCoordClient_GetDataViewVersions_Call) Run(run func(ctx context.Context, in *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption)) *MockDataCoordClient_GetDataViewVersions_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*datapb.GetDataViewVersionsRequest), variadicArgs...)
})
return _c
}
func (_c *MockDataCoordClient_GetDataViewVersions_Call) Return(_a0 *datapb.GetDataViewVersionsResponse, _a1 error) *MockDataCoordClient_GetDataViewVersions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoordClient_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error)) *MockDataCoordClient_GetDataViewVersions_Call {
_c.Call.Return(run)
return _c
}
// GetFlushAllState provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) GetFlushAllState(ctx context.Context, in *milvuspb.GetFlushAllStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushAllStateResponse, error) {
_va := make([]interface{}, len(opts))

View File

@ -47,6 +47,7 @@ service DataCoord {
rpc GetSegmentInfoChannel(GetSegmentInfoChannelRequest) returns (milvus.StringResponse){}
rpc SaveBinlogPaths(SaveBinlogPathsRequest) returns (common.Status){}
rpc GetDataViewVersions(GetDataViewVersionsRequest) returns (GetDataViewVersionsResponse){}
rpc GetRecoveryInfo(GetRecoveryInfoRequest) returns (GetRecoveryInfoResponse){}
rpc GetRecoveryInfoV2(GetRecoveryInfoRequestV2) returns (GetRecoveryInfoResponseV2){}
rpc GetFlushedSegments(GetFlushedSegmentsRequest) returns(GetFlushedSegmentsResponse){}
@ -422,6 +423,16 @@ message Binlog {
int64 memory_size = 7;
}
message GetDataViewVersionsRequest {
common.MsgBase base = 1;
repeated int64 collectionIDs = 2;
}
message GetDataViewVersionsResponse {
common.Status status = 1;
map<int64, int64> data_view_versions = 2;
}
message GetRecoveryInfoResponse {
common.Status status = 1;
repeated VchannelInfo channels = 2;

View File

@ -49,6 +49,7 @@ type Broker interface {
ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error)
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error)
GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error)
GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error)
GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error)
@ -231,6 +232,26 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection
return recoveryInfo.Channels, recoveryInfo.Binlogs, nil
}
// GetDataViewVersions retrieves the data view versions of the target collections.
func (broker *CoordinatorBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log := log.Ctx(ctx).With(zap.Int("numCollection", len(collectionIDs)))
req := &datapb.GetDataViewVersionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo),
),
CollectionIDs: collectionIDs,
}
resp, err := broker.dataCoord.GetDataViewVersions(ctx, req)
if err = merr.CheckRPCCall(resp, err); err != nil {
log.Warn("GetDataViewVersions failed", zap.Error(err))
return nil, err
}
return resp.GetDataViewVersions(), nil
}
func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()

View File

@ -202,6 +202,65 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C
return _c
}
// GetDataViewVersions provides a mock function with given fields: ctx, collectionIDs
func (_m *MockBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) {
ret := _m.Called(ctx, collectionIDs)
if len(ret) == 0 {
panic("no return value specified for GetDataViewVersions")
}
var r0 map[int64]int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) (map[int64]int64, error)); ok {
return rf(ctx, collectionIDs)
}
if rf, ok := ret.Get(0).(func(context.Context, []int64) map[int64]int64); ok {
r0 = rf(ctx, collectionIDs)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context, []int64) error); ok {
r1 = rf(ctx, collectionIDs)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockBroker_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions'
type MockBroker_GetDataViewVersions_Call struct {
*mock.Call
}
// GetDataViewVersions is a helper method to define mock.On call
// - ctx context.Context
// - collectionIDs []int64
func (_e *MockBroker_Expecter) GetDataViewVersions(ctx interface{}, collectionIDs interface{}) *MockBroker_GetDataViewVersions_Call {
return &MockBroker_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", ctx, collectionIDs)}
}
func (_c *MockBroker_GetDataViewVersions_Call) Run(run func(ctx context.Context, collectionIDs []int64)) *MockBroker_GetDataViewVersions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockBroker_GetDataViewVersions_Call) Return(_a0 map[int64]int64, _a1 error) *MockBroker_GetDataViewVersions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBroker_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, []int64) (map[int64]int64, error)) *MockBroker_GetDataViewVersions_Call {
_c.Call.Return(run)
return _c
}
// GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentIDs
func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentIDs ...int64) (map[int64][]*querypb.FieldIndexInfo, error) {
_va := make([]interface{}, len(segmentIDs))

View File

@ -40,6 +40,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const InitialDataViewVersion = 0
type targetOp int
func (op *targetOp) String() string {
@ -93,6 +95,8 @@ type TargetObserver struct {
// loadedDispatcher updates targets for loaded collections.
loadedDispatcher *taskDispatcher[int64]
dataViewVersions *typeutil.ConcurrentMap[int64, int64]
keylocks *lock.KeyLock[int64]
startOnce sync.Once
@ -118,6 +122,7 @@ func NewTargetObserver(
updateChan: make(chan targetUpdateRequest, 10),
readyNotifiers: make(map[int64][]chan struct{}),
initChan: make(chan initRequest),
dataViewVersions: typeutil.NewConcurrentMap[int64, int64](),
keylocks: lock.NewKeyLock[int64](),
}
@ -176,13 +181,8 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
case <-ticker.C:
ob.clean()
loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) {
if collection.GetStatus() == querypb.LoadStatus_Loaded {
return collection.GetCollectionID(), true
}
return 0, false
})
ob.loadedDispatcher.AddTask(loaded...)
versionUpdatedCollections := ob.GetVersionUpdatedCollections()
ob.loadedDispatcher.AddTask(versionUpdatedCollections...)
case req := <-ob.updateChan:
log.Info("manually trigger update target",
@ -227,6 +227,41 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
}
}
func (ob *TargetObserver) GetVersionUpdatedCollections() []int64 {
loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) {
if collection.GetStatus() == querypb.LoadStatus_Loaded {
return collection.GetCollectionID(), true
}
return 0, false
})
versions, err := ob.broker.GetDataViewVersions(context.Background(), loaded)
if err != nil {
log.Warn("GetDataViewVersions from dc failed", zap.Error(err))
return nil
}
var (
staleCnt int
updatedCnt int
)
ret := make([]int64, 0)
for _, id := range loaded {
new := versions[id]
current, ok := ob.dataViewVersions.Get(id)
if !ok || new == InitialDataViewVersion || new > current {
ret = append(ret, id)
ob.dataViewVersions.GetOrInsert(id, new)
updatedCnt++
continue
}
staleCnt++
}
log.Info("get version updated collections done", zap.Int("totalCnt", len(loaded)),
zap.Int("staleCnt", staleCnt), zap.Int("updatedCnt", updatedCnt))
return ret
}
// Check whether provided collection is has current target.
// If not, submit an async task into dispatcher.
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
@ -311,6 +346,14 @@ func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...in
func (ob *TargetObserver) clean() {
collectionSet := typeutil.NewUniqueSet(ob.meta.GetAll()...)
// for collection which has been dropped/released, clear data version cache
ob.dataViewVersions.Range(func(collectionID int64, _ int64) bool {
if !collectionSet.Contain(collectionID) {
ob.dataViewVersions.Remove(collectionID)
}
return true
})
// for collection which has been removed from target, try to clear nextTargetLastUpdate
ob.nextTargetLastUpdate.Range(func(collectionID int64, _ time.Time) bool {
if !collectionSet.Contain(collectionID) {
@ -352,6 +395,8 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
if err != nil {
log.Warn("failed to update next target for collection",
zap.Error(err))
// update next target failed, remove data view version cache
ob.dataViewVersions.Remove(collectionID)
return err
}
ob.updateNextTargetTimestamp(collectionID)

View File

@ -182,6 +182,14 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
suite.broker.EXPECT().
GetRecoveryInfoV2(mock.Anything, mock.Anything).
Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
suite.broker.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) {
versions := make(map[int64]int64)
for _, collectionID := range collectionIDs {
versions[collectionID] = InitialDataViewVersion
}
return versions, nil
})
suite.Eventually(func() bool {
return len(suite.targetMgr.GetSealedSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 &&

View File

@ -1096,6 +1096,8 @@ func (suite *ServiceSuite) TestRefreshCollection() {
suite.ErrorIs(err, merr.ErrCollectionNotLoaded)
}
suite.expectGetDataViewVersions()
// Test load all collections
suite.loadAll()
@ -1924,6 +1926,17 @@ func (suite *ServiceSuite) assertSegments(collection int64, segments []*querypb.
return true
}
func (suite *ServiceSuite) expectGetDataViewVersions() {
suite.broker.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) {
versions := make(map[int64]int64)
for _, collectionID := range collectionIDs {
versions[collectionID] = observers.InitialDataViewVersion
}
return versions, nil
})
}
func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
vChannels := []*datapb.VchannelInfo{}

View File

@ -3252,6 +3252,9 @@ type dataCoordConfig struct {
ClusteringCompactionSlotUsage ParamItem `refreshable:"true"`
MixCompactionSlotUsage ParamItem `refreshable:"true"`
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`
// data view
DataViewUpdateInterval ParamItem `refreshable:"true"`
}
func (p *dataCoordConfig) init(base *BaseTable) {
@ -4094,6 +4097,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Export: true,
}
p.L0DeleteCompactionSlotUsage.Init(base.mgr)
p.DataViewUpdateInterval = ParamItem{
Key: "dataCoord.dataView.updateInterval",
Version: "2.5.0",
Doc: "The interval (in seconds) for trying to update the data view of all collections.",
DefaultValue: "10",
PanicIfEmpty: false,
Export: false,
}
p.DataViewUpdateInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -7,7 +7,7 @@ import (
)
// MapEqual returns true if the two map contain the same keys and values
func MapEqual(left, right map[int64]int64) bool {
func MapEqual[K, V comparable](left, right map[K]V) bool {
if len(left) != len(right) {
return false
}