mirror of https://github.com/milvus-io/milvus.git
fix: Prevent clone when selecting segments from meta (#30928)
See also #30538 Previously the `SelectSegments` changed to clone all return value preventing possible update to returned info. Since meta is implemented following COW rules, this shall not happen and any update on segment shall have copy before it. This PR: - Remove clone for read-only Get segment info - Add Segment Operator abstraction for changing segment - Implemnt COW for updating MaxRowNum --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/30777/head
parent
385dec3b69
commit
6387403639
|
@ -23,6 +23,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -34,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -335,6 +337,25 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
|
|||
return false
|
||||
})
|
||||
|
||||
updateSegments := func(segments []*SegmentInfo, newMaxRows int64, isDiskAnn bool) error {
|
||||
for idx, segmentInfo := range segments {
|
||||
if segmentInfo.GetMaxRowNum() != newMaxRows {
|
||||
log.Info("segment max row recalculated",
|
||||
zap.Int64("segmentID", segmentInfo.GetID()),
|
||||
zap.Int64("old max rows", segmentInfo.GetMaxRowNum()),
|
||||
zap.Int64("new max rows", newMaxRows),
|
||||
zap.Bool("isDiskANN", isDiskAnn),
|
||||
)
|
||||
err := t.meta.UpdateSegment(segmentInfo.GetID(), SetMaxRowCount(newMaxRows))
|
||||
if err != nil && !errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
return err
|
||||
}
|
||||
segments[idx] = t.meta.GetSegment(segmentInfo.GetID())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
|
||||
if allDiskIndex {
|
||||
// Only if all vector fields index type are DiskANN, recalc segment max size here.
|
||||
|
@ -342,13 +363,9 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
|
||||
log.Info("segment max rows recalculated for DiskANN collection",
|
||||
zap.Int64("old max rows", segments[0].GetMaxRowNum()),
|
||||
zap.Int64("new max rows", int64(newMaxRows)))
|
||||
for _, segment := range segments {
|
||||
segment.MaxRowNum = int64(newMaxRows)
|
||||
}
|
||||
err = updateSegments(segments, int64(newMaxRows), true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
|
||||
|
@ -357,13 +374,9 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
|
|||
if err != nil {
|
||||
return allDiskIndex, err
|
||||
}
|
||||
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
|
||||
log.Info("segment max rows recalculated for non-DiskANN collection",
|
||||
zap.Int64("old max rows", segments[0].GetMaxRowNum()),
|
||||
zap.Int64("new max rows", int64(newMaxRows)))
|
||||
for _, segment := range segments {
|
||||
segment.MaxRowNum = int64(newMaxRows)
|
||||
}
|
||||
err = updateSegments(segments, int64(newMaxRows), true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return allDiskIndex, nil
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
|
@ -96,6 +97,10 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
globalTrigger *time.Ticker
|
||||
}
|
||||
|
||||
catalog := mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().AlterSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
|
||||
vecFieldID := int64(201)
|
||||
indexID := int64(1001)
|
||||
tests := []struct {
|
||||
|
@ -109,6 +114,7 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
"test force compaction",
|
||||
fields{
|
||||
&meta{
|
||||
catalog: catalog,
|
||||
segments: &SegmentsInfo{
|
||||
map[int64]*SegmentInfo{
|
||||
1: {
|
||||
|
@ -500,7 +506,7 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
_, err := tr.forceTriggerCompaction(tt.collectionID)
|
||||
assert.Equal(t, tt.wantErr, err != nil)
|
||||
// expect max row num = 2048*1024*1024/(128*4) = 4194304
|
||||
assert.EqualValues(t, 300, tt.fields.meta.segments.GetSegments()[0].MaxRowNum)
|
||||
assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum)
|
||||
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
|
||||
<-spy.spyChan
|
||||
})
|
||||
|
@ -2509,6 +2515,10 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
catalog := mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().AlterSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
|
@ -2519,6 +2529,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
|
|||
"all mem index",
|
||||
fields{
|
||||
&meta{
|
||||
catalog: catalog,
|
||||
segments: segmentsInfo,
|
||||
collections: map[int64]*collectionInfo{
|
||||
collectionID: info,
|
||||
|
@ -2579,6 +2590,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
|
|||
"all disk index",
|
||||
fields{
|
||||
&meta{
|
||||
catalog: catalog,
|
||||
segments: segmentsInfo,
|
||||
collections: map[int64]*collectionInfo{
|
||||
collectionID: info,
|
||||
|
@ -2639,6 +2651,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
|
|||
"some mme index",
|
||||
fields{
|
||||
&meta{
|
||||
catalog: catalog,
|
||||
segments: segmentsInfo,
|
||||
collections: map[int64]*collectionInfo{
|
||||
collectionID: info,
|
||||
|
|
|
@ -344,7 +344,7 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo {
|
|||
defer m.RUnlock()
|
||||
segment := m.segments.GetSegment(segID)
|
||||
if segment != nil && isSegmentHealthy(segment) {
|
||||
return segment.Clone()
|
||||
return segment
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -409,6 +409,45 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *meta) UpdateSegment(segmentID int64, operators ...SegmentOperator) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
info := m.segments.GetSegment(segmentID)
|
||||
if info == nil {
|
||||
log.Warn("meta update: UpdateSegment - segment not found",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
|
||||
return merr.WrapErrSegmentNotFound(segmentID)
|
||||
}
|
||||
// Persist segment updates first.
|
||||
cloned := info.Clone()
|
||||
|
||||
var updated bool
|
||||
for _, operator := range operators {
|
||||
updated = updated || operator(cloned)
|
||||
}
|
||||
|
||||
if !updated {
|
||||
log.Warn("meta update:UpdateSegmnt skipped, no update",
|
||||
zap.Int64("segmentID", segmentID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo}); err != nil {
|
||||
log.Warn("meta update: update segment - failed to alter segments",
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.SetSegment(segmentID, cloned)
|
||||
|
||||
log.Info("meta update: update segment - complete",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsetIsImporting removes the `isImporting` flag of a segment.
|
||||
func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
|
||||
log.Debug("meta update: unsetting isImport state of segment",
|
||||
|
@ -909,7 +948,7 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
|
|||
segments := m.segments.GetSegments()
|
||||
for _, info := range segments {
|
||||
if selector(info) {
|
||||
ret = append(ret, info.Clone())
|
||||
ret = append(ret, info)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/testutils"
|
||||
)
|
||||
|
@ -321,6 +322,93 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
|
|||
suite.EqualValues(2, mutation.rowCountAccChange)
|
||||
}
|
||||
|
||||
func (suite *MetaBasicSuite) TestSetSegment() {
|
||||
meta := suite.meta
|
||||
catalog := mocks.NewDataCoordCatalog(suite.T())
|
||||
meta.catalog = catalog
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
suite.Run("normal", func() {
|
||||
segmentID := int64(1000)
|
||||
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: segmentID,
|
||||
MaxRowNum: 30000,
|
||||
CollectionID: suite.collID,
|
||||
InsertChannel: suite.channelName,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
})
|
||||
err := meta.AddSegment(ctx, segment)
|
||||
suite.Require().NoError(err)
|
||||
|
||||
noOp := func(segment *SegmentInfo) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
|
||||
err = meta.UpdateSegment(segmentID, noOp)
|
||||
suite.NoError(err)
|
||||
})
|
||||
|
||||
suite.Run("not_updated", func() {
|
||||
segmentID := int64(1001)
|
||||
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: segmentID,
|
||||
MaxRowNum: 30000,
|
||||
CollectionID: suite.collID,
|
||||
InsertChannel: suite.channelName,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
})
|
||||
err := meta.AddSegment(ctx, segment)
|
||||
suite.Require().NoError(err)
|
||||
|
||||
noOp := func(segment *SegmentInfo) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
err = meta.UpdateSegment(segmentID, noOp)
|
||||
suite.NoError(err)
|
||||
})
|
||||
|
||||
suite.Run("catalog_error", func() {
|
||||
segmentID := int64(1002)
|
||||
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: segmentID,
|
||||
MaxRowNum: 30000,
|
||||
CollectionID: suite.collID,
|
||||
InsertChannel: suite.channelName,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
})
|
||||
err := meta.AddSegment(ctx, segment)
|
||||
suite.Require().NoError(err)
|
||||
|
||||
noOp := func(segment *SegmentInfo) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(errors.New("mocked")).Once()
|
||||
|
||||
err = meta.UpdateSegment(segmentID, noOp)
|
||||
suite.Error(err)
|
||||
})
|
||||
|
||||
suite.Run("segment_not_found", func() {
|
||||
segmentID := int64(1003)
|
||||
|
||||
noOp := func(segment *SegmentInfo) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
err := meta.UpdateSegment(segmentID, noOp)
|
||||
suite.Error(err)
|
||||
suite.ErrorIs(err, merr.ErrSegmentNotFound)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMeta(t *testing.T) {
|
||||
suite.Run(t, new(MetaBasicSuite))
|
||||
suite.Run(t, new(MetaReloadSuite))
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
// SegmentOperator is function type to update segment info.
|
||||
type SegmentOperator func(segment *SegmentInfo) bool
|
||||
|
||||
func SetMaxRowCount(maxRow int64) SegmentOperator {
|
||||
return func(segment *SegmentInfo) bool {
|
||||
if segment.MaxRowNum == maxRow {
|
||||
return false
|
||||
}
|
||||
segment.MaxRowNum = maxRow
|
||||
return true
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
type TestSegmentOperatorSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func (s *TestSegmentOperatorSuite) TestSetMaxRowCount() {
|
||||
segment := &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
MaxRowNum: 300,
|
||||
},
|
||||
}
|
||||
|
||||
ops := SetMaxRowCount(20000)
|
||||
updated := ops(segment)
|
||||
s.Require().True(updated)
|
||||
s.EqualValues(20000, segment.GetMaxRowNum())
|
||||
|
||||
updated = ops(segment)
|
||||
s.False(updated)
|
||||
}
|
||||
|
||||
func TestSegmentOperators(t *testing.T) {
|
||||
suite.Run(t, new(TestSegmentOperatorSuite))
|
||||
}
|
|
@ -304,6 +304,8 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
|
|||
}, nil
|
||||
}
|
||||
|
||||
segment = segment.Clone()
|
||||
|
||||
err := binlog.DecompressBinLog(storage.InsertBinlog, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), segment.GetBinlogs())
|
||||
if err != nil {
|
||||
return &datapb.GetInsertBinlogPathsResponse{
|
||||
|
|
Loading…
Reference in New Issue