diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 8d157ce947..22fde4c276 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -21,12 +21,6 @@ import ( "go.uber.org/zap" ) -type clusterDeltaChange struct { - newNodes []string - offlines []string - restarts []string -} - // data node register func, simple func wrapping policy type dataNodeRegisterPolicy func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus) diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 1186cc4f2e..40eaf49f0c 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -81,9 +82,6 @@ type sealPolicy func(maxCount, writtenCount, allocatedCount int64) bool // segmentSealPolicy seal policy applies to segment type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool -// channelSealPolicy seal policy applies to channel -type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo - // getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy { return func(segment *SegmentInfo, ts Timestamp) bool { @@ -96,12 +94,18 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy { } // getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime -func getLastExpiresLifetimePolicy(lifetime uint64) segmentSealPolicy { +func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy { return func(segment *SegmentInfo, ts Timestamp) bool { - return (ts - segment.GetLastExpireTime()) > lifetime + pts, _ := tsoutil.ParseTS(ts) + epts, _ := tsoutil.ParseTS(segment.GetLastExpireTime()) + d := pts.Sub(epts) + return d >= lifetime } } +// channelSealPolicy seal policy applies to channel +type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo + // getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy { return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo { @@ -121,10 +125,6 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) { }) } -func sealPolicyV1(maxCount, writtenCount, allocatedCount int64) bool { - return float64(writtenCount) >= Params.SegmentSealProportion*float64(maxCount) -} - type flushPolicy func(segment *SegmentInfo, t Timestamp) bool const flushInterval = 2 * time.Second diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go new file mode 100644 index 0000000000..6fed2e5a2b --- /dev/null +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -0,0 +1,35 @@ +package datacoord + +import ( + "testing" + "time" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/stretchr/testify/assert" +) + +func TestSealSegmentPolicy(t *testing.T) { + t.Run("test seal segment by lifetime", func(t *testing.T) { + lifetime := 2 * time.Second + now := time.Now() + curTS := now.UnixNano() / int64(time.Millisecond) + nosealTs := (now.Add(lifetime / 2)).UnixNano() / int64(time.Millisecond) + sealTs := (now.Add(lifetime)).UnixNano() / int64(time.Millisecond) + + p := sealByLifetimePolicy(lifetime) + + segment := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + LastExpireTime: tsoutil.ComposeTS(curTS, 0), + }, + } + + shouldSeal := p(segment, tsoutil.ComposeTS(nosealTs, 0)) + assert.False(t, shouldSeal) + + shouldSeal = p(segment, tsoutil.ComposeTS(sealTs, 0)) + assert.True(t, shouldSeal) + }) +} diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index ff03c4c4d4..3e36c4e7ae 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -29,6 +29,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" ) +const segmentMaxLifetime = 24 * time.Hour + // Manager manage segment related operations. type Manager interface { // AllocSegment allocate rows and record the allocation. @@ -133,12 +135,11 @@ func defaultAlocatePolicy() AllocatePolicy { return AllocatePolicyV1 } -func defaultSealPolicy() sealPolicy { - return sealPolicyV1 -} - -func defaultSegmentSealPolicy() segmentSealPolicy { - return getSegmentCapacityPolicy(Params.SegmentSealProportion) +func defaultSegmentSealPolicy() []segmentSealPolicy { + return []segmentSealPolicy{ + sealByLifetimePolicy(segmentMaxLifetime), + getSegmentCapacityPolicy(Params.SegmentSealProportion), + } } func defaultFlushPolicy() flushPolicy { @@ -154,8 +155,8 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se segments: make([]UniqueID, 0), estimatePolicy: defaultCalUpperLimitPolicy(), allocPolicy: defaultAlocatePolicy(), - segmentSealPolicies: []segmentSealPolicy{defaultSegmentSealPolicy()}, // default only segment size policy - channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy + segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy + channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy flushPolicy: defaultFlushPolicy(), allocPool: sync.Pool{ New: func() interface{} {