Make SegmentInfo.size atomic (#24457)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/24459/head
congqixia 2023-05-29 11:31:28 +08:00 committed by GitHub
parent c84bdcea49
commit 6375236533
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 16 deletions

View File

@ -331,7 +331,6 @@ func TestMeta_AddSegmentIndex(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
},
},
@ -461,7 +460,6 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
},
},

View File

@ -237,7 +237,6 @@ func TestServer_GetIndexState(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
},
}},
@ -307,7 +306,6 @@ func TestServer_GetIndexState(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
},
}},
@ -433,7 +431,6 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
}
@ -468,7 +465,6 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
}
resp, err := s.GetSegmentIndexState(ctx, req)
@ -565,7 +561,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
},
},
@ -618,7 +613,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
size: 0,
lastWrittenTime: time.Time{},
},
},

View File

@ -375,7 +375,7 @@ func TestMeta_Basic(t *testing.T) {
segID0, err := mockAllocator.allocID(ctx)
assert.Nil(t, err)
segInfo0 := buildSegment(collID, partID0, segID0, channelName, false)
segInfo0.size = size0
segInfo0.size.Store(size0)
err = meta.AddSegment(segInfo0)
assert.Nil(t, err)
@ -383,7 +383,7 @@ func TestMeta_Basic(t *testing.T) {
segID1, err := mockAllocator.allocID(ctx)
assert.Nil(t, err)
segInfo1 := buildSegment(collID, partID0, segID1, channelName, false)
segInfo1.size = size1
segInfo1.size.Store(size1)
err = meta.AddSegment(segInfo1)
assert.Nil(t, err)
@ -455,7 +455,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
assert.Equal(t, len(updated.Statslogs[0].Binlogs), len(expected.Statslogs[0].Binlogs))
assert.Equal(t, len(updated.Deltalogs[0].Binlogs), len(expected.Deltalogs[0].Binlogs))
assert.Equal(t, updated.State, expected.State)
assert.Equal(t, updated.size, expected.size)
assert.Equal(t, updated.size.Load(), expected.size.Load())
assert.Equal(t, updated.NumOfRows, expected.NumOfRows)
})

View File

@ -20,6 +20,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
@ -43,7 +44,7 @@ type SegmentInfo struct {
lastFlushTime time.Time
isCompacting bool
// a cache to avoid calculate twice
size int64
size atomic.Int64
lastWrittenTime time.Time
}
@ -259,9 +260,9 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
size: s.size,
lastWrittenTime: s.lastWrittenTime,
}
cloned.size.Store(s.size.Load())
for _, opt := range opts {
opt(cloned)
@ -382,7 +383,7 @@ func addSegmentBinlogs(field2Binlogs map[UniqueID][]*datapb.Binlog) SegmentInfoO
}
func (s *SegmentInfo) getSegmentSize() int64 {
if s.size <= 0 {
if s.size.Load() <= 0 {
var size int64
for _, binlogs := range s.GetBinlogs() {
for _, l := range binlogs.GetBinlogs() {
@ -401,9 +402,11 @@ func (s *SegmentInfo) getSegmentSize() int64 {
size += l.GetLogSize()
}
}
s.size = size
if size > 0 {
s.size.Store(size)
}
}
return s.size
return s.size.Load()
}
// SegmentInfoSelector is the function type to select SegmentInfo from meta