diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 378e5fa76d..d7aa23ae7f 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -34,6 +35,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -335,6 +337,25 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, return false }) + updateSegments := func(segments []*SegmentInfo, newMaxRows int64, isDiskAnn bool) error { + for idx, segmentInfo := range segments { + if segmentInfo.GetMaxRowNum() != newMaxRows { + log.Info("segment max row recalculated", + zap.Int64("segmentID", segmentInfo.GetID()), + zap.Int64("old max rows", segmentInfo.GetMaxRowNum()), + zap.Int64("new max rows", newMaxRows), + zap.Bool("isDiskANN", isDiskAnn), + ) + err := t.meta.UpdateSegment(segmentInfo.GetID(), SetMaxRowCount(newMaxRows)) + if err != nil && !errors.Is(err, merr.ErrSegmentNotFound) { + return err + } + segments[idx] = t.meta.GetSegment(segmentInfo.GetID()) + } + } + return nil + } + allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) if allDiskIndex { // Only if all vector fields index type are DiskANN, recalc segment max size here. @@ -342,13 +363,9 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, if err != nil { return false, err } - if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { - log.Info("segment max rows recalculated for DiskANN collection", - zap.Int64("old max rows", segments[0].GetMaxRowNum()), - zap.Int64("new max rows", int64(newMaxRows))) - for _, segment := range segments { - segment.MaxRowNum = int64(newMaxRows) - } + err = updateSegments(segments, int64(newMaxRows), true) + if err != nil { + return false, err } } // If some vector fields index type are not DiskANN, recalc segment max size using default policy. @@ -357,13 +374,9 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, if err != nil { return allDiskIndex, err } - if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { - log.Info("segment max rows recalculated for non-DiskANN collection", - zap.Int64("old max rows", segments[0].GetMaxRowNum()), - zap.Int64("new max rows", int64(newMaxRows))) - for _, segment := range segments { - segment.MaxRowNum = int64(newMaxRows) - } + err = updateSegments(segments, int64(newMaxRows), true) + if err != nil { + return false, err } } return allDiskIndex, nil diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 6faef07a0f..dc946a903e 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" @@ -96,6 +97,10 @@ func Test_compactionTrigger_force(t *testing.T) { globalTrigger *time.Ticker } + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().AlterSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() + vecFieldID := int64(201) indexID := int64(1001) tests := []struct { @@ -109,6 +114,7 @@ func Test_compactionTrigger_force(t *testing.T) { "test force compaction", fields{ &meta{ + catalog: catalog, segments: &SegmentsInfo{ map[int64]*SegmentInfo{ 1: { @@ -500,7 +506,7 @@ func Test_compactionTrigger_force(t *testing.T) { _, err := tr.forceTriggerCompaction(tt.collectionID) assert.Equal(t, tt.wantErr, err != nil) // expect max row num = 2048*1024*1024/(128*4) = 4194304 - assert.EqualValues(t, 300, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) + assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) <-spy.spyChan }) @@ -2509,6 +2515,10 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { }, } + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().AlterSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() + tests := []struct { name string fields fields @@ -2519,6 +2529,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { "all mem index", fields{ &meta{ + catalog: catalog, segments: segmentsInfo, collections: map[int64]*collectionInfo{ collectionID: info, @@ -2579,6 +2590,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { "all disk index", fields{ &meta{ + catalog: catalog, segments: segmentsInfo, collections: map[int64]*collectionInfo{ collectionID: info, @@ -2639,6 +2651,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { "some mme index", fields{ &meta{ + catalog: catalog, segments: segmentsInfo, collections: map[int64]*collectionInfo{ collectionID: info, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 3d88ada594..b67d08b2ff 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -344,7 +344,7 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo { defer m.RUnlock() segment := m.segments.GetSegment(segID) if segment != nil && isSegmentHealthy(segment) { - return segment.Clone() + return segment } return nil } @@ -409,6 +409,45 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e return nil } +func (m *meta) UpdateSegment(segmentID int64, operators ...SegmentOperator) error { + m.Lock() + defer m.Unlock() + info := m.segments.GetSegment(segmentID) + if info == nil { + log.Warn("meta update: UpdateSegment - segment not found", + zap.Int64("segmentID", segmentID)) + + return merr.WrapErrSegmentNotFound(segmentID) + } + // Persist segment updates first. + cloned := info.Clone() + + var updated bool + for _, operator := range operators { + updated = updated || operator(cloned) + } + + if !updated { + log.Warn("meta update:UpdateSegmnt skipped, no update", + zap.Int64("segmentID", segmentID), + ) + return nil + } + + if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo}); err != nil { + log.Warn("meta update: update segment - failed to alter segments", + zap.Int64("segmentID", segmentID), + zap.Error(err)) + return err + } + // Update in-memory meta. + m.segments.SetSegment(segmentID, cloned) + + log.Info("meta update: update segment - complete", + zap.Int64("segmentID", segmentID)) + return nil +} + // UnsetIsImporting removes the `isImporting` flag of a segment. func (m *meta) UnsetIsImporting(segmentID UniqueID) error { log.Debug("meta update: unsetting isImport state of segment", @@ -909,7 +948,7 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { segments := m.segments.GetSegments() for _, info := range segments { if selector(info) { - ret = append(ret, info.Clone()) + ret = append(ret, info) } } return ret diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 7a9bbd621c..a079bbf256 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/testutils" ) @@ -321,6 +322,93 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { suite.EqualValues(2, mutation.rowCountAccChange) } +func (suite *MetaBasicSuite) TestSetSegment() { + meta := suite.meta + catalog := mocks.NewDataCoordCatalog(suite.T()) + meta.catalog = catalog + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + suite.Run("normal", func() { + segmentID := int64(1000) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + MaxRowNum: 30000, + CollectionID: suite.collID, + InsertChannel: suite.channelName, + State: commonpb.SegmentState_Flushed, + }) + err := meta.AddSegment(ctx, segment) + suite.Require().NoError(err) + + noOp := func(segment *SegmentInfo) bool { + return true + } + + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Once() + + err = meta.UpdateSegment(segmentID, noOp) + suite.NoError(err) + }) + + suite.Run("not_updated", func() { + segmentID := int64(1001) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + MaxRowNum: 30000, + CollectionID: suite.collID, + InsertChannel: suite.channelName, + State: commonpb.SegmentState_Flushed, + }) + err := meta.AddSegment(ctx, segment) + suite.Require().NoError(err) + + noOp := func(segment *SegmentInfo) bool { + return false + } + + err = meta.UpdateSegment(segmentID, noOp) + suite.NoError(err) + }) + + suite.Run("catalog_error", func() { + segmentID := int64(1002) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + MaxRowNum: 30000, + CollectionID: suite.collID, + InsertChannel: suite.channelName, + State: commonpb.SegmentState_Flushed, + }) + err := meta.AddSegment(ctx, segment) + suite.Require().NoError(err) + + noOp := func(segment *SegmentInfo) bool { + return true + } + + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(errors.New("mocked")).Once() + + err = meta.UpdateSegment(segmentID, noOp) + suite.Error(err) + }) + + suite.Run("segment_not_found", func() { + segmentID := int64(1003) + + noOp := func(segment *SegmentInfo) bool { + return true + } + + err := meta.UpdateSegment(segmentID, noOp) + suite.Error(err) + suite.ErrorIs(err, merr.ErrSegmentNotFound) + }) +} + func TestMeta(t *testing.T) { suite.Run(t, new(MetaBasicSuite)) suite.Run(t, new(MetaReloadSuite)) diff --git a/internal/datacoord/segment_operator.go b/internal/datacoord/segment_operator.go new file mode 100644 index 0000000000..1e2c1fe4e7 --- /dev/null +++ b/internal/datacoord/segment_operator.go @@ -0,0 +1,30 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +// SegmentOperator is function type to update segment info. +type SegmentOperator func(segment *SegmentInfo) bool + +func SetMaxRowCount(maxRow int64) SegmentOperator { + return func(segment *SegmentInfo) bool { + if segment.MaxRowNum == maxRow { + return false + } + segment.MaxRowNum = maxRow + return true + } +} diff --git a/internal/datacoord/segment_operator_test.go b/internal/datacoord/segment_operator_test.go new file mode 100644 index 0000000000..7b837f45a2 --- /dev/null +++ b/internal/datacoord/segment_operator_test.go @@ -0,0 +1,49 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +type TestSegmentOperatorSuite struct { + suite.Suite +} + +func (s *TestSegmentOperatorSuite) TestSetMaxRowCount() { + segment := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + MaxRowNum: 300, + }, + } + + ops := SetMaxRowCount(20000) + updated := ops(segment) + s.Require().True(updated) + s.EqualValues(20000, segment.GetMaxRowNum()) + + updated = ops(segment) + s.False(updated) +} + +func TestSegmentOperators(t *testing.T) { + suite.Run(t, new(TestSegmentOperatorSuite)) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 0921d767d5..1a9a5c578e 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -304,6 +304,8 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert }, nil } + segment = segment.Clone() + err := binlog.DecompressBinLog(storage.InsertBinlog, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), segment.GetBinlogs()) if err != nil { return &datapb.GetInsertBinlogPathsResponse{